作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.
Alexey Saenko's profile image

Alexey Saenko

Alexey是一名经过认证的Java SE程序员, Java EE Web服务和业务组件开发人员, 主要精通Java和大数据.

Previously At

T-Systems
Share

批处理——以面向批量为特征, non-interactive, and frequently long running, 后台执行——几乎在每个行业都被广泛使用,并被应用于各种各样的任务. 批处理可能是数据密集型或计算密集型的, execute sequentially or in parallel, 并且可以通过各种调用模型启动, including ad hoc, scheduled, and on-demand.

本Spring批处理教程解释了批处理应用程序的编程模型和领域语言, in particular, 控件设计和开发批处理应用程序的一些有用方法 Spring Batch 3.0.7 version.

What is Spring Batch?

Spring Batch is a lightweight, 全面的框架,旨在促进开发健壮的批处理应用程序. 它还提供更先进的技术服务和特性,通过优化和分区技术支持超大容量和高性能批处理作业. 的基于pojo的开发方法 Spring Framework, familiar to all experienced Spring developers.

By way of example, 本文将考虑加载xml格式的客户文件的示例项目的源代码, 根据各种属性筛选客户, 并将过滤后的条目输出到文本文件. 我们的Spring Batch示例的源代码(它使用了 Lombok annotations) is available here 在GitHub上,需要Java SE 8和Maven.

What is Batch Processing? Key Concepts and Terminology

对于任何批处理开发人员来说,熟悉和熟悉批处理的主要概念是很重要的. 下图是批处理参考架构的简化版本,该架构已经在许多不同的平台上实现了数十年. 它介绍了Spring batch使用的批处理相关的关键概念和术语.

春季批教程:关键概念和术语

如我们的批处理示例所示,批处理通常由 Job consisting of multiple Steps. Each Step typically has a single ItemReader, ItemProcessor, and ItemWriter. A Job is executed by a JobLauncher,有关已配置和已执行作业的元数据存储在 JobRepository.

Each Job may be associated with multiple JobInstanceS,每一个都是由它的特殊性定义的 JobParameters that are used to start a batch job. Each run of a JobInstance is referred to as a JobExecution. Each JobExecution 通常跟踪运行过程中发生的事情, such as current and exit statuses, start and end times, etc.

A Step 是一个独立的,特定的阶段的批 Job, such that every Job is composed of one or more Steps. Similar to a Job, a Step has an individual StepExecution ,表示执行的单次尝试 Step. StepExecution 存储有关当前和退出状态的信息, start and end times, and so on, 以及对其相应的引用 Step and JobExecution instances.

An ExecutionContext 一组键-值对是否包含的信息的作用域为其中之一 StepExecution or JobExecution. Spring Batch persists the ExecutionContext,在需要重新启动批处理运行的情况下(例如.g.、发生致命错误时等.). 所需要做的就是将任何要在步骤之间共享的对象放入上下文中,框架将处理其余的工作. 重启后,从先前的值 ExecutionContext 是否从数据库中恢复并应用.

JobRepository Spring Batch中的机制使所有这些持久化成为可能吗. It provides CRUD operations for JobLauncher, Job, and Step instantiations. Once a Job is launched, a JobExecution 从存储库中获得,并且在执行过程中, StepExecution and JobExecution 实例被持久化到存储库中.

Spring批处理框架入门

Spring Batch的优点之一是项目依赖最小, 这使得启动和快速运行更容易. 少数确实存在的依赖在项目的文档中有明确的说明和解释 pom.xml, which can be accessed here.

应用程序的实际启动发生在一个类中,看起来像下面这样:

@EnableBatchProcessing
@SpringBootApplication
public class BatchApplication {
    public static void main(String[] args) {
        prepareTestData(1000);
        SpringApplication.run(BatchApplication.class, args);
    }
}

The @EnableBatchProcessing annotation启用了Spring Batch特性,并为设置批处理作业提供了基本配置.

The @SpringBootApplication annotation comes from the Spring Boot 项目,提供独立的、生产就绪的、基于spring的应用程序. 它指定了一个配置类,该类声明一个或多个Spring bean,并触发自动配置和Spring的组件扫描.

我们的示例项目只有一个作业,该作业由 CustomerReportJobConfig with an injected JobBuilderFactory and StepBuilderFactory. 中定义最小作业配置 CustomerReportJobConfig as follows:

@Configuration
公共类CustomerReportJobConfig {
    @Autowired
    private JobBuilderFactory;

    @Autowired
    StepBuilderFactory;

    @Bean
    public Job customerReportJob() {
        return jobBuilders.get("customerReportJob")
            .start(taskletStep())
            .next(chunkStep())
            .build();
    }

    @Bean
    public Step taskletStep() {
        return stepBuilders.get("taskletStep")
            .tasklet(tasklet())
            .build();
    }

    @Bean
    public Tasklet tasklet() {
        return (contribution, chunkContext) -> {
            return RepeatStatus.FINISHED;
        };
    }
}

构建步骤有两种主要方法.

如上面的例子所示,一种方法是 tasklet-based. A Tasklet 支持只有一个方法的简单接口, execute(),该方法被反复调用,直到它返回 RepeatStatus.FINISHED 或抛出异常以表示失败. Each call to the Tasklet is wrapped in a transaction.

Another approach, chunk-oriented processing, 指按顺序读取数据并创建将在事务边界内写入的“块”. 每个单独的项都是从 ItemReader, handed to an ItemProcessor, and aggregated. 一旦读取的项数等于提交间隔,整个块将通过 ItemWriter,然后提交事务. 面向块的步骤可以配置如下:

@Bean
public Job customerReportJob() {
    return jobBuilders.get("customerReportJob")
        .start(taskletStep())
        .next(chunkStep())
        .build();
}

@Bean
public Step chunkStep() {
    return stepBuilders.get("chunkStep")
        .chunk(20)
        .reader(reader())
        .processor(processor())
        .writer(writer())
        .build();
}

The chunk() 方法构建一个步骤,该步骤按所提供的大小按块处理项, 然后将每个块传递给指定的读取器, processor, and writer. 本文的下一节将更详细地讨论这些方法.

Custom Reader

对于我们的Spring Batch示例应用程序, 以便从XML文件中读取客户列表, 我们需要提供接口的实现 org.springframework.batch.item.ItemReader:

public interface ItemReader {
    T read()抛出异常,expectedinputexception, ParseException, NonTransientResourceException;
}

An ItemReader 提供数据,并且预期是有状态的. 通常对每个批处理调用多次,每次调用 read() 返回下一个值,最后返回 null 当所有输入数据耗尽时.

的一些开箱即用的实现 ItemReader, 哪些可以用于各种目的,例如阅读集合, files, 集成JMS和JDBC以及多个源, and so on.

In our sample application, the CustomerItemReader class delegates actual read() 类的惰性初始化实例 IteratorItemReader class:

public class CustomerItemReader implements ItemReader {

    private final String filename;

    private ItemReader delegate;

    公共CustomerItemReader(最终字符串文件名){
        this.filename = filename;
    }

    @Override
    public Customer read()抛出异常{
        if (delegate == null) {
            delegate = new IteratorItemReader<>(customers());
        }
        return delegate.read();
    }

    private List customers() throws FileNotFoundException {
        try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) {
            return (List) decoder.readObject();
        }
    }
}

创建此实现的Spring bean @Component and @StepScope annotations, 让Spring知道这个类是一个step-scope的Spring组件,每执行一步就会创建一次,如下所示:

@StepScope
@Bean
public ItemReader reader() {
    返回新的CustomerItemReader(XML_FILE);
}

Custom Processors

ItemProcessors 转换输入项并在面向项的处理场景中引入业务逻辑. 它们必须提供接口的实现 org.springframework.batch.item.ItemProcessor:

public interface ItemProcessor {
    O process(I item) throws Exception;
}

The method process() accepts one instance of the I 类的实例,可能返回也可能不返回相同类型的实例. Returning null 指示不应继续处理该项. 和往常一样,Spring提供的标准处理器很少,比如 CompositeItemProcessor 它通过一系列的注入 ItemProcessors and a ValidatingItemProcessor that validates input.

在我们的示例应用程序中, 处理器用于根据以下要求过滤客户:

  • 客户必须是在当月出生的.g.为生日特色菜打旗号等.)
  • 客户完成的交易必须少于五笔(例如.g., to identify newer customers)

“当前月份”需求是通过自定义实现的 ItemProcessor:

public class BirthdayFilterProcessor implements ItemProcessor {
    @Override
    公共客户进程(最终客户项)抛出异常{
        if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) {
            return item;
        }
        return null;
    }
}

“事务数量有限”的要求被实现为 ValidatingItemProcessor:

public class TransactionValidatingProcessor extends ValidatingItemProcessor {
    公共TransactionValidatingProcessor(最终int限制){
        super(
            item -> {
                if (item.getTransactions() >= limit) {
                    抛出新的ValidationException("客户拥有少于" + limit + "事务");
                }
            }
        );
        setFilter(true);
    }
}

然后将这对处理器封装在 CompositeItemProcessor 它实现了委托模式:

@StepScope
@Bean
public ItemProcessor processor() {
    final CompositeItemProcessor processor = new CompositeItemProcessor<>();
    processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5)));
    return processor;
}

Custom Writers

为了输出数据,Spring Batch提供了接口 org.springframework.batch.item.ItemWriter 根据需要序列化对象:

public interface ItemWriter {
    void write(List items) throws Exception;
}

The write() 方法负责确保刷新所有内部缓冲区. If a transaction is active, 通常还需要在随后的回滚中丢弃输出. 写入器向其发送数据的资源通常应该能够自己处理这个问题. 有一些标准实现,例如 CompositeItemWriter, JdbcBatchItemWriter, JmsItemWriter, JpaItemWriter, SimpleMailMessageItemWriter, and others.

在我们的示例应用程序中,过滤的客户列表如下所示:

public class CustomerItemWriter implements ItemWriter, Closeable {
    private final PrintWriter writer;

    public CustomerItemWriter() {
        OutputStream out;
        try {
            out = new FileOutputStream("output.txt");
        } catch (FileNotFoundException e) {
            out = System.out;
        }
        this.writer = new PrintWriter(out);
    }

    @Override
    public void write(final List items) throws Exception {
        for (Customer item : items) {
            writer.println(item.toString());
        }
    }

    @PreDestroy
    @Override
    close()抛出IOException {
        writer.close();
    }
}

Scheduling Spring Batch Jobs

默认情况下,Spring Batch执行它能找到的所有作业.e., that are configured as in CustomerReportJobConfig) at startup. 要更改此行为,请在启动时禁用作业执行,方法是将以下属性添加到 application.properties:

spring.batch.job.enabled=false

实际的调度是通过添加 @EnableScheduling 注释到配置类和 @Scheduled 注释到执行作业本身的方法. 调度可以配置delay、rates或cron表达式:

// run every 5000 msec (i.e., every 5 secs)
@Scheduled(fixedRate = 5000)
public void run()抛出异常{
    jobeexecution = jobLauncher.run(
        customerReportJob(),
        new JobParametersBuilder().toJobParameters()
    );
}

上面的例子有一个问题. 在运行时,作业将只在第一次成功. 当它第二次启动时(i.e. after five seconds), 它将在日志中生成以下消息(注意,在Spring Batch a的早期版本中 JobInstanceAlreadyCompleteException would have been thrown):

INFO 36988 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher: Job: [SimpleJob: [name=customerReportJob]]以以下参数启动:[{}]
INFO 36988 --- [pool-2-thread-1] o.s.batch.core.job.步骤已经完成或不能重新启动, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=
INFO 36988 --- [pool-2-thread-1] o.s.batch.core.job.步骤已经完成或不能重新启动, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=

This happens because only unique JobInstances可以被创建和执行,并且Spring Batch没有办法区分第一和第二 JobInstance.

在调度批处理作业时,有两种方法可以避免此问题.

一是确保引入一个或多个唯一参数(例如.g.(以纳秒为单位的实际开始时间):

@Scheduled(fixedRate = 5000)
public void run()抛出异常{
    jobLauncher.run(
        customerReportJob(),
        new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters()
    );
}

或者,您可以按以下顺序启动下一个作业 JobInstances determined by the JobParametersIncrementer attached to the specified job with SimpleJobOperator.startNextInstance():

@Autowired
private JobOperator operator;
 
@Autowired
private JobExplorer jobs;
 
@Scheduled(fixedRate = 5000)
public void run()抛出异常{
    List lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1);
    if (lastInstances.isEmpty()) {
        jobLauncher.运行(customerReportJob(), new JobParameters());
    } else {
        operator.startNextInstance(JOB_NAME);
    }
}

Spring Batch Unit Testing

通常,要在Spring Boot应用程序中运行单元测试,框架必须加载相应的 ApplicationContext. 为此使用了两个注释:

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {...})

There is a utility class org.springframework.batch.test.JobLauncherTestUtils to test batch jobs. 它提供了启动整个作业的方法,并允许对各个步骤进行端到端测试,而不必运行作业中的每个步骤. 它必须声明为一个Spring bean:

@Configuration
BatchTestConfiguration {
    @Bean
    public JobLauncherTestUtils () {
        return new JobLauncherTestUtils();
    }
}

作业和步骤的典型测试如下(也可以使用任何mock框架):

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class})
公共类CustomerReportJobConfigTest {

    @Autowired
    private joblaunchertesttils;

    @Autowired
    private CustomerReportJobConfig配置;

    @Test
    testoutjob()抛出异常{
        final jobeexecution result = testtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters());
        Assert.assertNotNull(result);
        Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus());
    }

    @Test
    public void testSpecificStep() {
        Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus());
    }
}

Spring Batch为步骤和作业上下文引入了额外的作用域. 这些作用域中的对象使用Spring容器作为对象工厂, 因此,每个执行步骤或作业只有一个这样的bean实例. 类中可访问的引用的后期绑定也提供了支持 StepContext or JobContext. 在运行时配置为步骤作用域或作业作用域的组件很难作为独立组件进行测试,除非您有办法将上下文设置为它们处于步骤或作业执行中. That is the goal of the org.springframework.batch.test.StepScopeTestExecutionListener and org.springframework.batch.test.StepScopeTestUtils 组件,以及 JobScopeTestExecutionListener and JobScopeTestUtils.

The TestExecutionListeners are declared at the class level, 它的工作是为每个测试方法创建一个步骤执行上下文. For example:

@RunWith(SpringRunner.class)
@TestExecutionListeners ({DependencyInjectionTestExecutionListener.类,StepScopeTestExecutionListener.class})
@ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class})
BirthdayFilterProcessorTest {

    @Autowired
    private BirthdayFilterProcessor处理器;

    getStepExecution() {
        return MetaDataInstanceFactory.createStepExecution();
    }

    @Test
    public void filter()抛出异常{
        final Customer Customer = new Customer();
        customer.setId(1);
        customer.setName("name");
        customer.setBirthday(新GregorianCalendar ());
        Assert.assertNotNull(processor.process(customer));
    }

}

There are two TestExecutionListeners. 一个来自常规的Spring Test框架,并从配置的应用程序上下文处理依赖注入. The other is the Spring Batch StepScopeTestExecutionListener 这为单元测试中的依赖注入设置了步骤作用域上下文. A StepContext 是为测试方法的持续时间而创建的,并且对注入的任何依赖项都是可用的. 默认的行为是创建一个 StepExecution with fixed properties. Alternatively, the StepContext 是否可以由测试用例作为返回正确类型的工厂方法提供.

Another approach is based on the StepScopeTestUtils utility class. 这个类用于创建和操作 StepScope 在单元测试中更灵活地使用依赖注入. For example, 读取上述处理器过滤的客户ID可以如下所示:

@Test
public void filterId()抛出异常{
    final Customer Customer = new Customer();
    customer.setId(1);
    customer.setName("name");
    customer.setBirthday(新GregorianCalendar ());
    final int id = StepScopeTestUtils.doInStepScope(
        getStepExecution(),
        () -> processor.process(customer).getId()
    );
    Assert.assertEquals(1, id);
}

Ready for Advanced Spring Batch?

本文介绍了Spring Batch应用程序设计和开发的一些基础知识. However, 还有许多更高级的主题和功能,例如缩放, parallel processing, listeners, 还有更多的问题在本文中没有讨论. 希望本文能够为您的入门提供有用的基础.

有关这些更高级主题的信息可以在 official Spring Back documentation for Spring Batch.

Understanding the basics

  • What is Spring Batch?

    Spring Batch is a lightweight, 全面的框架,旨在促进开发健壮的批处理应用程序. 它还提供更先进的技术服务和特性,通过优化和分区技术支持超大容量和高性能批处理作业.

  • 春季批处理中“作业”和“步骤”之间的关系是什么?

    A “Step” 是一个独立的,特定的阶段的批 “Job”, 这样每个作业都由一个或多个步骤组成.

  • 什么是春季批处理中的“作业存储库”?

    “JobRepository”是Spring Batch中使所有这些持久化成为可能的机制. 它为JobLauncher、Job和Step实例化提供CRUD操作.

  • 构建Spring Batch的两种主要方法是什么?

    One approach is tasklet-based, 其中Tasklet支持一个简单的接口与一个单一的execute()方法. The other approach, chunk-oriented processing, 指按顺序读取数据并创建将在事务边界内写入的“块”.

Hire a Toptal expert on this topic.
Hire Now
Alexey Saenko's profile image
Alexey Saenko

Located in Berlin, Germany

Member since July 12, 2016

About the author

Alexey是一名经过认证的Java SE程序员, Java EE Web服务和业务组件开发人员, 主要精通Java和大数据.

Toptal作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.

Previously At

T-Systems

世界级的文章,每周发一次.

订阅意味着同意我们的 privacy policy

世界级的文章,每周发一次.

订阅意味着同意我们的 privacy policy

Toptal Developers

Join the Toptal® community.