Spring Framrwork

Spring Batch Step간 데이터 공유

gregorio 2018. 9. 5. 15:25

Spring Batch framework에서 하나의 Job에 여러개의 Step이 있을 경우 Step간에 데이터를 공유하는 방법이다.


먼저 Job에서 Step을 정의할 때 Spring Framework에서 제공하는  ExecutionContextPromotionListener를 각각의 Step의 Listener로 등록한다.


■ Job 정의 예제


<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">

<import resource="classpath:/spring/context-main.xml" />


<!-- Send Data between steps -->

<bean id="promotionListener" class="org.springframework.batch.core.listener.ExecutionContextPromotionListener">

<property name="keys" value="params" />

</bean>

<job id="BatchApiKeyResetJob" parent="baseJob" xmlns="http://www.springframework.org/schema/batch">

<step id="BatchApiKeyResetJobStep1">

<tasklet>

<chunk reader="BatchApiKeyResetJobStepReader" processor="BatchApiKeyResetJobStepProcessor" writer="BatchApiKeyResetJobStepWriter" commit-interval="10"></chunk>

<listeners>

<!-- listener ref="apiKeyResultJobStepListener" /-->

<listener ref="promotionListener" />

</listeners>

</tasklet>

<next on="*" to="BatchApiKeyResetJobStep2"/>

</step>

<step id="BatchApiKeyResetJobStep2" parent="BatchApiKeyResetJobStep1">

<tasklet>

<chunk reader="BatchApiKeyResetJobStepReader" processor="BatchApiKeyResetJobStepProcessor2" writer="BatchApiKeyResetJobStepWriter" commit-interval="10"></chunk>

<listeners>

<!-- listener ref="apiKeyResultJobStepListener" /-->

<listener ref="promotionListener" />

</listeners>

</tasklet>

</step>

</job>

<!-- Reader Setting -->

<bean id="BatchApiKeyResetJobStepReader" class="org.mybatis.spring.batch.MyBatisPagingItemReader" scope="step">

<property name="queryId" value="common.BatchInterface_SQL.retrieveInterfaceList" />

<property name="sqlSessionFactory" ref="sqlSession" />

</bean>

<!-- Processor Setting -->

<bean id="BatchApiKeyResetJobStepProcessor" class="batch.processor.BatchApiKeyResetJobProcessor" >

<property name="baseDao" ref="durBaseDao" />

</bean>

<!-- Processor Setting -->

<bean id="BatchApiKeyResetJobStepProcessor2" class="batch.processor.BatchApiKeyResetJobProcessor2" >

<property name="baseDao" ref="durBaseDao" />

</bean>


<!-- Writer Setting -->

<bean id="BatchApiKeyResetJobStepWriter" class="org.mybatis.spring.batch.MyBatisBatchItemWriter" scope="step">

<property name="statementId" value="common.BatchInterface_SQL.updateInterface" />

<property name="sqlSessionFactory" ref="sqlSession" />

<property name="assertUpdates" value="false" />

</bean>

</beans>


각 Step간에 전달할 keys를 정의한다. 여기서는 하나의 객체만 전달하기 때문에 params로 값을 정의하였다.


ItemProcess에서 StepExecution을 이용하여 params를 이용하여 값을 저장한다.


■BatchApiKeyResetJobProcessor.java


import javax.annotation.Resource;


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.batch.core.ExitStatus;

import org.springframework.batch.core.StepExecution;

import org.springframework.batch.core.annotation.AfterStep;

import org.springframework.batch.core.annotation.BeforeStep;

import org.springframework.batch.item.ExecutionContext;


import cmn.util.dao.CamelMap;


public class BatchApiKeyResetJobProcessor extends BaseProcessor<CamelMap, CamelMap> {

private static final Logger LOGGER = LoggerFactory.getLogger(BatchApiKeyResetJobProcessor.class);

@Resource(name="testService")

private TestService testService;

private StepExecution stepExecution;

@Override

public CamelMap executeProcess(CamelMap camelMap) throws Exception {

if (LOGGER.isDebugEnabled()) {

LOGGER.debug("Input Data :: {}", camelMap);

}

// baseDao.update("sehati.common.BatchInterface_SQL.testupdate1", camelMap);

String uuid = UUID.randomUUID().toString().toUpperCase().replaceAll("-", "");

camelMap.put("intfAuthCrtfId", uuid);

stepExecution.getExecutionContext().put("uuid", uuid);

ExecutionContext stepContext = this.stepExecution.getExecutionContext();

stepContext.put("params", camelMap);

return camelMap;

}

@BeforeStep

public void beforeStep(StepExecution stepExecution) {

this.stepExecution = stepExecution;

}

@AfterStep

public ExitStatus afterStep(StepExecution stepExecution) {

String uuid = stepExecution.getExecutionContext().getString("uuid");

LOGGER.debug("UUID :: {}", uuid);

ExitStatus exitStatus = stepExecution.getExitStatus();

if( exitStatus.getExitCode() != ExitStatus.COMPLETED.getExitCode()) {

try {

testService.updateData(new CamelMap());

} catch (Exception e) {

e.printStackTrace();

}

}

return exitStatus;

}


}


먼저 Step이 실행되기 전에 stepExecution을 ItemProcessor에 정의되어 있는 stepExecution 변수에 저장한다.


ItemProcessor에서 stepException를 이용하여 stepContext를 객체를 생성한 후   promotionListener Bean에서 정의한 keys의 value를 이용하여 stepContext에 다음 Step에 전달할 데이터를 저장한다.



■BatchApiKeyResetJobProcessor2.java


import java.util.UUID;


import javax.annotation.Resource;


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.batch.core.ExitStatus;

import org.springframework.batch.core.JobExecution;

import org.springframework.batch.core.StepExecution;

import org.springframework.batch.core.annotation.AfterStep;

import org.springframework.batch.core.annotation.BeforeStep;

import org.springframework.batch.item.ExecutionContext;


import cmn.util.dao.CamelMap;

import cmn.util.spring.batch.processor.BaseProcessor;

import sehati.batch.service.TestService;


public class BatchApiKeyResetJobProcessor2 extends BaseProcessor<CamelMap, CamelMap> {

private static final Logger LOGGER = LoggerFactory.getLogger(BatchApiKeyResetJobProcessor2.class);

@Resource(name="testService")

private TestService testService;

private StepExecution stepExecution;

@Override

public CamelMap executeProcess(CamelMap camelMap) throws Exception {

if (LOGGER.isDebugEnabled()) {

LOGGER.debug("Input Data :: {}", camelMap);

}

baseDao.update("sehati.common.BatchInterface_SQL.testupdate1", camelMap);

String uuid = UUID.randomUUID().toString().toUpperCase().replaceAll("-", "");

camelMap.put("intfAuthCrtfId", uuid);

stepExecution.getExecutionContext().put("uuid", uuid);

ExecutionContext stepContext = this.stepExecution.getExecutionContext();

stepContext.put("params", camelMap);


return camelMap;

}

@BeforeStep

public void beforeStep(StepExecution stepExecution) {

JobExecution jobExecution = stepExecution.getJobExecution();

ExecutionContext jobContext = jobExecution.getExecutionContext();

CamelMap camelMap = (CamelMap) jobContext.get("params");

LOGGER.info("{}", camelMap);

this.stepExecution = stepExecution; 

}

@AfterStep

public ExitStatus afterStep(StepExecution stepExecution) {

if( stepExecution.getExitStatus().getExitCode() != ExitStatus.COMPLETED.getExitCode()) {

try {

testService.updateData(new CamelMap());

} catch (Exception e) {

e.printStackTrace();

}

}

return stepExecution.getExitStatus();

}


}


ItemProcess에서 Step이 실행되기 전에 이전 Step에서 stepContext에 저장한 객체를 받아온다.


이렇게 하면 Step간에 데이터를 공유할 수 있다.


물론 실 환경에 적용하기 위해서는 더 많은 고민이 필요할 거 같다.

예를 들면 Chunked 방식으로 하는 경우 Step간에 공유할 데이터를 어디에서 생성할 것인지 또는 AfterStep에서 하는 경우 ItemProcess에서 AfterStep에 데이터를 보내는 방법 등 해결해야할 이슈가 있다.


물론 예제를 보면 ItemProcess에서 setpExecution에서 객체를 저장하는 방법이 있다.