Spring Batch Step간 데이터 공유
Spring Batch framework에서 하나의 Job에 여러개의 Step이 있을 경우 Step간에 데이터를 공유하는 방법이다.
먼저 Job에서 Step을 정의할 때 Spring Framework에서 제공하는 ExecutionContextPromotionListener를 각각의 Step의 Listener로 등록한다.
■ Job 정의 예제
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">
<import resource="classpath:/spring/context-main.xml" /> <!-- Send Data between steps --> <bean id="promotionListener" class="org.springframework.batch.core.listener.ExecutionContextPromotionListener"> <property name="keys" value="params" /> </bean>
<job id="BatchApiKeyResetJob" parent="baseJob" xmlns="http://www.springframework.org/schema/batch"> <step id="BatchApiKeyResetJobStep1"> <tasklet> <chunk reader="BatchApiKeyResetJobStepReader" processor="BatchApiKeyResetJobStepProcessor" writer="BatchApiKeyResetJobStepWriter" commit-interval="10"></chunk> <listeners> <!-- listener ref="apiKeyResultJobStepListener" /--> <listener ref="promotionListener" /> </listeners> </tasklet> <next on="*" to="BatchApiKeyResetJobStep2"/> </step> <step id="BatchApiKeyResetJobStep2" parent="BatchApiKeyResetJobStep1"> <tasklet> <chunk reader="BatchApiKeyResetJobStepReader" processor="BatchApiKeyResetJobStepProcessor2" writer="BatchApiKeyResetJobStepWriter" commit-interval="10"></chunk> <listeners> <!-- listener ref="apiKeyResultJobStepListener" /--> <listener ref="promotionListener" /> </listeners> </tasklet> </step> </job>
<!-- Reader Setting --> <bean id="BatchApiKeyResetJobStepReader" class="org.mybatis.spring.batch.MyBatisPagingItemReader" scope="step"> <property name="queryId" value="common.BatchInterface_SQL.retrieveInterfaceList" /> <property name="sqlSessionFactory" ref="sqlSession" /> </bean>
<!-- Processor Setting --> <bean id="BatchApiKeyResetJobStepProcessor" class="batch.processor.BatchApiKeyResetJobProcessor" > <property name="baseDao" ref="durBaseDao" /> </bean>
<!-- Processor Setting --> <bean id="BatchApiKeyResetJobStepProcessor2" class="batch.processor.BatchApiKeyResetJobProcessor2" > <property name="baseDao" ref="durBaseDao" /> </bean> <!-- Writer Setting --> <bean id="BatchApiKeyResetJobStepWriter" class="org.mybatis.spring.batch.MyBatisBatchItemWriter" scope="step"> <property name="statementId" value="common.BatchInterface_SQL.updateInterface" /> <property name="sqlSessionFactory" ref="sqlSession" /> <property name="assertUpdates" value="false" /> </bean>
</beans> |
각 Step간에 전달할 keys를 정의한다. 여기서는 하나의 객체만 전달하기 때문에 params로 값을 정의하였다.
ItemProcess에서 StepExecution을 이용하여 params를 이용하여 값을 저장한다.
■BatchApiKeyResetJobProcessor.java
import javax.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.annotation.AfterStep; import org.springframework.batch.core.annotation.BeforeStep; import org.springframework.batch.item.ExecutionContext; import cmn.util.dao.CamelMap; public class BatchApiKeyResetJobProcessor extends BaseProcessor<CamelMap, CamelMap> {
private static final Logger LOGGER = LoggerFactory.getLogger(BatchApiKeyResetJobProcessor.class);
@Resource(name="testService") private TestService testService;
private StepExecution stepExecution;
@Override public CamelMap executeProcess(CamelMap camelMap) throws Exception { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Input Data :: {}", camelMap); } // baseDao.update("sehati.common.BatchInterface_SQL.testupdate1", camelMap); String uuid = UUID.randomUUID().toString().toUpperCase().replaceAll("-", ""); camelMap.put("intfAuthCrtfId", uuid); stepExecution.getExecutionContext().put("uuid", uuid); ExecutionContext stepContext = this.stepExecution.getExecutionContext(); stepContext.put("params", camelMap); return camelMap; }
@BeforeStep public void beforeStep(StepExecution stepExecution) { this.stepExecution = stepExecution; }
@AfterStep public ExitStatus afterStep(StepExecution stepExecution) { String uuid = stepExecution.getExecutionContext().getString("uuid"); LOGGER.debug("UUID :: {}", uuid); ExitStatus exitStatus = stepExecution.getExitStatus(); if( exitStatus.getExitCode() != ExitStatus.COMPLETED.getExitCode()) { try { testService.updateData(new CamelMap()); } catch (Exception e) { e.printStackTrace(); } } return exitStatus; } } |
먼저 Step이 실행되기 전에 stepExecution을 ItemProcessor에 정의되어 있는 stepExecution 변수에 저장한다.
ItemProcessor에서 stepException를 이용하여 stepContext를 객체를 생성한 후 promotionListener Bean에서 정의한 keys의 value를 이용하여 stepContext에 다음 Step에 전달할 데이터를 저장한다.
■BatchApiKeyResetJobProcessor2.java
import java.util.UUID; import javax.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.annotation.AfterStep; import org.springframework.batch.core.annotation.BeforeStep; import org.springframework.batch.item.ExecutionContext; import cmn.util.dao.CamelMap; import cmn.util.spring.batch.processor.BaseProcessor; import sehati.batch.service.TestService; public class BatchApiKeyResetJobProcessor2 extends BaseProcessor<CamelMap, CamelMap> {
private static final Logger LOGGER = LoggerFactory.getLogger(BatchApiKeyResetJobProcessor2.class);
@Resource(name="testService") private TestService testService;
private StepExecution stepExecution;
@Override public CamelMap executeProcess(CamelMap camelMap) throws Exception { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Input Data :: {}", camelMap); } baseDao.update("sehati.common.BatchInterface_SQL.testupdate1", camelMap); String uuid = UUID.randomUUID().toString().toUpperCase().replaceAll("-", ""); camelMap.put("intfAuthCrtfId", uuid); stepExecution.getExecutionContext().put("uuid", uuid); ExecutionContext stepContext = this.stepExecution.getExecutionContext(); stepContext.put("params", camelMap); return camelMap; }
@BeforeStep public void beforeStep(StepExecution stepExecution) { JobExecution jobExecution = stepExecution.getJobExecution(); ExecutionContext jobContext = jobExecution.getExecutionContext(); CamelMap camelMap = (CamelMap) jobContext.get("params");
LOGGER.info("{}", camelMap); this.stepExecution = stepExecution; }
@AfterStep public ExitStatus afterStep(StepExecution stepExecution) {
if( stepExecution.getExitStatus().getExitCode() != ExitStatus.COMPLETED.getExitCode()) { try { testService.updateData(new CamelMap()); } catch (Exception e) { e.printStackTrace(); } }
return stepExecution.getExitStatus(); } } |
ItemProcess에서 Step이 실행되기 전에 이전 Step에서 stepContext에 저장한 객체를 받아온다.
이렇게 하면 Step간에 데이터를 공유할 수 있다.
물론 실 환경에 적용하기 위해서는 더 많은 고민이 필요할 거 같다.
예를 들면 Chunked 방식으로 하는 경우 Step간에 공유할 데이터를 어디에서 생성할 것인지 또는 AfterStep에서 하는 경우 ItemProcess에서 AfterStep에 데이터를 보내는 방법 등 해결해야할 이슈가 있다.
물론 예제를 보면 ItemProcess에서 setpExecution에서 객체를 저장하는 방법이 있다.