Pages

Senin, 03 Maret 2014

Spring batch working with the listeners

Q. Why do you have listeners in Spring batch?
A. Reader --> to read data and Writer --> to write chunked data and processor --> to filter out records before they are passed to the ItemWriter. Filtering is an action distinct from skipping; skipping indicates that a record is invalid whereas filtering simply indicates that a record should not be written.

Listeners are events that can be executed before or after a point in time like before or after executing a step, etc.


Here is an example that uses all the listeners except for the ItemProcessListener.


package com.mysample.job;

import java.util.List;

import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.SkipListener;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;

public class SampleListener implements ItemWriteListener<List<String>>, StepExecutionListener, JobExecutionListener,
ItemReadListener<String>, ChunkListener, SkipListener<String, String> {

@Override
public void afterRead(String arg0) {
}

@Override
public void beforeRead() {
}

@Override
public void onReadError(Exception arg0) {
}

@Override
public void afterJob(JobExecution arg0) {
}

@Override
public void beforeJob(JobExecution arg0) {
}

@Override
public ExitStatus afterStep(StepExecution arg0) {
return null;
}

@Override
public void beforeStep(StepExecution arg0) {
}

@Override
public void afterWrite(List<? extends List<String>> arg0) {
}

@Override
public void beforeWrite(List<? extends List<String>> arg0) {
}

@Override
public void onWriteError(Exception arg0, List<? extends List<String>> arg1) {
}

@Override
public void onSkipInProcess(String arg0, Throwable arg1) {
}

@Override
public void onSkipInRead(Throwable arg0) {
}

@Override
public void onSkipInWrite(String arg0, Throwable arg1) {
}

@Override
public void afterChunk() {

}

@Override
public void beforeChunk() {
}
}


Q. Where do you declare a listener?
A. In the Spring config file.

    <job id="sampleJob" restartable="false"
xmlns="http://www.springframework.org/schema/batch">
<step id="sampleStep">
<tasklet task-executor="taskExecutor">
<chunk reader="sampleReader" writer="sampleWriter"
commit-interval="3">
</chunk>
<listeners>
<listener ref="sampleListener" />
</listeners>
</tasklet>
</step>
</job>

<bean id="sampleListener" scope="step" class="com.mysample.SampleListener">
<property name="jobId" value="#{stepExecutionContext[JOB_ID]}" />
<property name="runDate" value="#{jobParameters[RUN_DATE]}" />
</bean>


Q. What do you understand by the terms StepExecution, JobExecution, JobExecutionContext, and StepExecutionContext?
A. You can store data between readers and writers using StepExecution or JobExecution. For example, you may want to keep track of the failed accounts by storing them in a job control table. These accounts can be read again from the job control table and rerun in the next job run.

stepExecution.getExecutionContext().put(ValidationJobMapKey.FAILED_ACCOUNTS.name(), failedItems);
jobExecution.getExecutionContext().put(ValidationJobMapKey.JOB_INFO.name(), info);

Here is a more comprehensive sample listener code.

package com.mysample.job;

//...

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import javax.annotation.Resource;

import org.joda.time.Instant;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
//....

/**
* writes batch run data to the job control table after write and after step
* failed accounts are stored in the stepExecution context
*/
public class SampleListener implements ItemWriteListener<TradeAccount>, StepExecutionListener {

private static final Logger logger = LoggerFactory.getLogger(SampleListener.class);

@Resource(name = "sampleService")
private SampleService sampleService;

private Long jobId;
private Date runDate;
private Instant startTime;

private StepExecution stepExecution;

@Override
public void beforeWrite(List<? extends TradeAccount> items) {
startTime = new Instant();
}

@Override
public void afterWrite(List<? extends TradeAccount> items) {
Period period = new Interval(startTime, new Instant()).toPeriod();
logger.info("Finish call service, duration: " + getDuration(period));
try {
JobControlMetaData meta = new JobControlMetaData();
meta.setJobId(this.jobId);
meta.setJobRunDate(runDate);
//Spring batch gives you the ExitStatus
meta.setJobStatus(ExitStatus.COMPLETED);
sampleService.updateJobControlTable(meta, items);
} catch (Exception e) {
logger.error("Failed to update job control: ", e);
}

logger.info("Finish write, duration: " + getDuration(new Interval(startTime, new Instant()).toPeriod()));
}

@Override
/**
* Transactions are rolleback here.
*/
public void onWriteError(Exception exception, List<? extends TradeAccount> items) {
logger.error("Error to process accounts: " + items);
handleFailedAccounts(exception, items);
}

/**
* Store failed accounts in the step execution context
*/
private synchronized void handleFailedAccounts(Exception exception, List<? extends TradeAccount> items){
for (TradeAccount tradeAccount : items) {
tradeAccount.setErrorMsg(exception.getMessage());
}

@SuppressWarnings("unchecked")
List<TradeAccount> failedItems = (List<TradeAccount>) stepExecution.getExecutionContext().get(ConstantEnum.FAILED_ACCOUNTS.name());
if(failedItems == null) {
failedItems = new ArrayList<TradeAccount>();
}

failedItems.addAll(items);

//ConstantEnum is enum class with constant values.
stepExecution.getExecutionContext().put(ConstantEnum.FAILED_ACCOUNTS.name(), failedItems);
}

public Long getJobId() {
return jobId;
}

public void setJobId(Long jobId) {
this.jobId = jobId;
}

public Date getRunDate() {
return runDate;
}

public void setRunDate(Date runDate) {
this.runDate = runDate;
}

@Override
/**
* Write failed accounts to the job control table
**/
public ExitStatus afterStep(StepExecution stepExecution) {
@SuppressWarnings("unchecked")
List<TradeAccount> failedItems = (List<TradeAccount>) stepExecution.getExecutionContext().get(ConstantEnum.FAILED_ACCOUNTS.name());
if(failedItems != null && failedItems.size() > 0) {
try {
JobControlMetaData meta = new JobControlMetaData();
meta.setJobId(this.jobId);
meta.setJobRunDate(runDate);
//Spring batch gives you the ExitStatus
meta.setJobStatus(ExitStatus.FAILED);
sampleService.updateJobControlTable(meta, failedItems);
} catch (Exception e) {
logger.error("Failed to update job control: ",e);
}
}
return null;
}

@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
//Spring allocates job ids. so get it.
jobId = stepExecution.getJobExecution().getJobId();
stepExecution.getExecutionContext().put(ConstantEnum.JOB_ID.name(), jobId);
}

private static String getDuration(Period period) {
return period.getHours() + ":" + period.getMinutes() + ":" + period.getSeconds() + "." + period.getMillis();
}
}


0 komentar:

Posting Komentar