아키텍처와 함께

블로그 이미지
by gregorio
  • Total hit
  • Today hit
  • Yesterday hit

Spring Batch framework에서 하나의 Job에 여러개의 Step이 있을 경우 Step간에 데이터를 공유하는 방법이다.


먼저 Job에서 Step을 정의할 때 Spring Framework에서 제공하는  ExecutionContextPromotionListener를 각각의 Step의 Listener로 등록한다.


■ Job 정의 예제


<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">

<import resource="classpath:/spring/context-main.xml" />


<!-- Send Data between steps -->

<bean id="promotionListener" class="org.springframework.batch.core.listener.ExecutionContextPromotionListener">

<property name="keys" value="params" />

</bean>

<job id="BatchApiKeyResetJob" parent="baseJob" xmlns="http://www.springframework.org/schema/batch">

<step id="BatchApiKeyResetJobStep1">

<tasklet>

<chunk reader="BatchApiKeyResetJobStepReader" processor="BatchApiKeyResetJobStepProcessor" writer="BatchApiKeyResetJobStepWriter" commit-interval="10"></chunk>

<listeners>

<!-- listener ref="apiKeyResultJobStepListener" /-->

<listener ref="promotionListener" />

</listeners>

</tasklet>

<next on="*" to="BatchApiKeyResetJobStep2"/>

</step>

<step id="BatchApiKeyResetJobStep2" parent="BatchApiKeyResetJobStep1">

<tasklet>

<chunk reader="BatchApiKeyResetJobStepReader" processor="BatchApiKeyResetJobStepProcessor2" writer="BatchApiKeyResetJobStepWriter" commit-interval="10"></chunk>

<listeners>

<!-- listener ref="apiKeyResultJobStepListener" /-->

<listener ref="promotionListener" />

</listeners>

</tasklet>

</step>

</job>

<!-- Reader Setting -->

<bean id="BatchApiKeyResetJobStepReader" class="org.mybatis.spring.batch.MyBatisPagingItemReader" scope="step">

<property name="queryId" value="common.BatchInterface_SQL.retrieveInterfaceList" />

<property name="sqlSessionFactory" ref="sqlSession" />

</bean>

<!-- Processor Setting -->

<bean id="BatchApiKeyResetJobStepProcessor" class="batch.processor.BatchApiKeyResetJobProcessor" >

<property name="baseDao" ref="durBaseDao" />

</bean>

<!-- Processor Setting -->

<bean id="BatchApiKeyResetJobStepProcessor2" class="batch.processor.BatchApiKeyResetJobProcessor2" >

<property name="baseDao" ref="durBaseDao" />

</bean>


<!-- Writer Setting -->

<bean id="BatchApiKeyResetJobStepWriter" class="org.mybatis.spring.batch.MyBatisBatchItemWriter" scope="step">

<property name="statementId" value="common.BatchInterface_SQL.updateInterface" />

<property name="sqlSessionFactory" ref="sqlSession" />

<property name="assertUpdates" value="false" />

</bean>

</beans>


각 Step간에 전달할 keys를 정의한다. 여기서는 하나의 객체만 전달하기 때문에 params로 값을 정의하였다.


ItemProcess에서 StepExecution을 이용하여 params를 이용하여 값을 저장한다.


■BatchApiKeyResetJobProcessor.java


import javax.annotation.Resource;


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.batch.core.ExitStatus;

import org.springframework.batch.core.StepExecution;

import org.springframework.batch.core.annotation.AfterStep;

import org.springframework.batch.core.annotation.BeforeStep;

import org.springframework.batch.item.ExecutionContext;


import cmn.util.dao.CamelMap;


public class BatchApiKeyResetJobProcessor extends BaseProcessor<CamelMap, CamelMap> {

private static final Logger LOGGER = LoggerFactory.getLogger(BatchApiKeyResetJobProcessor.class);

@Resource(name="testService")

private TestService testService;

private StepExecution stepExecution;

@Override

public CamelMap executeProcess(CamelMap camelMap) throws Exception {

if (LOGGER.isDebugEnabled()) {

LOGGER.debug("Input Data :: {}", camelMap);

}

// baseDao.update("sehati.common.BatchInterface_SQL.testupdate1", camelMap);

String uuid = UUID.randomUUID().toString().toUpperCase().replaceAll("-", "");

camelMap.put("intfAuthCrtfId", uuid);

stepExecution.getExecutionContext().put("uuid", uuid);

ExecutionContext stepContext = this.stepExecution.getExecutionContext();

stepContext.put("params", camelMap);

return camelMap;

}

@BeforeStep

public void beforeStep(StepExecution stepExecution) {

this.stepExecution = stepExecution;

}

@AfterStep

public ExitStatus afterStep(StepExecution stepExecution) {

String uuid = stepExecution.getExecutionContext().getString("uuid");

LOGGER.debug("UUID :: {}", uuid);

ExitStatus exitStatus = stepExecution.getExitStatus();

if( exitStatus.getExitCode() != ExitStatus.COMPLETED.getExitCode()) {

try {

testService.updateData(new CamelMap());

} catch (Exception e) {

e.printStackTrace();

}

}

return exitStatus;

}


}


먼저 Step이 실행되기 전에 stepExecution을 ItemProcessor에 정의되어 있는 stepExecution 변수에 저장한다.


ItemProcessor에서 stepException를 이용하여 stepContext를 객체를 생성한 후   promotionListener Bean에서 정의한 keys의 value를 이용하여 stepContext에 다음 Step에 전달할 데이터를 저장한다.



■BatchApiKeyResetJobProcessor2.java


import java.util.UUID;


import javax.annotation.Resource;


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.batch.core.ExitStatus;

import org.springframework.batch.core.JobExecution;

import org.springframework.batch.core.StepExecution;

import org.springframework.batch.core.annotation.AfterStep;

import org.springframework.batch.core.annotation.BeforeStep;

import org.springframework.batch.item.ExecutionContext;


import cmn.util.dao.CamelMap;

import cmn.util.spring.batch.processor.BaseProcessor;

import sehati.batch.service.TestService;


public class BatchApiKeyResetJobProcessor2 extends BaseProcessor<CamelMap, CamelMap> {

private static final Logger LOGGER = LoggerFactory.getLogger(BatchApiKeyResetJobProcessor2.class);

@Resource(name="testService")

private TestService testService;

private StepExecution stepExecution;

@Override

public CamelMap executeProcess(CamelMap camelMap) throws Exception {

if (LOGGER.isDebugEnabled()) {

LOGGER.debug("Input Data :: {}", camelMap);

}

baseDao.update("sehati.common.BatchInterface_SQL.testupdate1", camelMap);

String uuid = UUID.randomUUID().toString().toUpperCase().replaceAll("-", "");

camelMap.put("intfAuthCrtfId", uuid);

stepExecution.getExecutionContext().put("uuid", uuid);

ExecutionContext stepContext = this.stepExecution.getExecutionContext();

stepContext.put("params", camelMap);


return camelMap;

}

@BeforeStep

public void beforeStep(StepExecution stepExecution) {

JobExecution jobExecution = stepExecution.getJobExecution();

ExecutionContext jobContext = jobExecution.getExecutionContext();

CamelMap camelMap = (CamelMap) jobContext.get("params");

LOGGER.info("{}", camelMap);

this.stepExecution = stepExecution; 

}

@AfterStep

public ExitStatus afterStep(StepExecution stepExecution) {

if( stepExecution.getExitStatus().getExitCode() != ExitStatus.COMPLETED.getExitCode()) {

try {

testService.updateData(new CamelMap());

} catch (Exception e) {

e.printStackTrace();

}

}

return stepExecution.getExitStatus();

}


}


ItemProcess에서 Step이 실행되기 전에 이전 Step에서 stepContext에 저장한 객체를 받아온다.


이렇게 하면 Step간에 데이터를 공유할 수 있다.


물론 실 환경에 적용하기 위해서는 더 많은 고민이 필요할 거 같다.

예를 들면 Chunked 방식으로 하는 경우 Step간에 공유할 데이터를 어디에서 생성할 것인지 또는 AfterStep에서 하는 경우 ItemProcess에서 AfterStep에 데이터를 보내는 방법 등 해결해야할 이슈가 있다.


물론 예제를 보면 ItemProcess에서 setpExecution에서 객체를 저장하는 방법이 있다.

AND

ARTICLE CATEGORY

분류 전체보기 (56)
Spring Framrwork (33)
Linux (1)
APM (1)
Java (8)
python (0)
ant (1)
chart (1)
OS (1)
tomcat (1)
apache (1)
database (0)

RECENT ARTICLE

RECENT COMMENT

CALENDAR

«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31

ARCHIVE

LINK