400+ Java Interview Questions and Answers blog |
Spring batch advanced tutorial -- writing your own reader Posted: 19 Dec 2012 04:08 PM PST
"Portfolio1","29/02/2012","11/03/2012", "Portfolio1","Account1","OPENBAL", 2000.00 "Portfolio1 ","Account1","PURCHASE",1000.00 "Portfolio1 ","Account1","EXPENSE",500.00 "Portfolio1 ","Account1","ADJUSTMENT ", 200.00 "Portfolio1","Account1","OPENBAL ", 12000.00 "Portfolio1 ","Account2","PURCHASE",1000.00 "Portfolio1 ","Account3","ADJUSTMENT",1000.00 So, wee need to write a custom file reader that can peek into the next record before reading it. Step 1: Snippets of the spring batch context configuration file.E..g.applicationContext-myapp.xml. <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:task="http://www.springframework.org/schema/task" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:int="http://www.springframework.org/schema/integration" xmlns:file="http://www.springframework.org/schema/integration/file" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file-2.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd"> <!-- load properties file--> <context:property-placeholder location="classpath:myapp.properties" /> <!-- annotation driven injection --> <tx:annotation-driven /> <!-- define the job that reads from a CSV file and write to a database--> <job id="myAppJob" xmlns="http://www.springframework.org/schema/batch"> <listeners> <listener ref="myAppJobExecutionListener" /> </listeners> <step id="loadMyAppFeedData"> <tasklet transaction-manager="transactionManager"> <listeners> <listener ref="stepExecutionListener" /> </listeners> <chunk reader="groupMyAppDetailsReader" writer="myAppFileItemWriter" commit-interval="10" /> </tasklet> </step> </job> <!-- Spring supplied File Item Reader that reads CSV file line by line--> <bean id="myAppFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step"> <property name="resource" value="#{jobParameters['dataFileName']}" /> <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> <property name="names" value="portfolioCd,accountCd,transactionType, Amount" /> </bean> </property> <property name="fieldSetMapper"> <bean class="com.myapp.mapper.MyAppFieldSetMapper" /> </property> </bean> </property> <property name="linesToSkip" value="1" /> <property name="skippedLinesCallback" ref="myAppFileHeaderLineCallbackHandler" /> </bean> <!-- My custom CSV file Reader that groups data but it internally makes use of the Spring's FileItemReader--> <bean id="groupMyAppDetailsReader" class="com.myapp.item.reader.myAppItemReader"> <property name="delegate" ref="myAppFileItemReader" /> </bean> <!-- My custome File Item Writer --> <bean id="myAppFileItemWriter" class="com.myapp.item.writer.MyAppItemWriter" /> <!-- The Step execution context listener that can be injected to propagate step values --> <bean id="stepExecutionListener" class="com.myapp.StepExecutionListenerCtxInjecter" /> </beans> Step 2: The custom reader can be implemented as shown below. The key here is that peeking the next record to enable grouping and making use of the Spring provided FileItemReader as a delegate to read each CSV line. package com.myapp.item.reader; import java.util.ArrayList; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemStreamException; import org.springframework.batch.item.ItemStreamReader; import org.springframework.batch.item.ParseException; import org.springframework.batch.item.UnexpectedInputException; import com.myapp.model.TransactionDetail; import com.myapp.model.MyAppPortfolioParent; public class MyAppFileItemReader implements ItemStreamReader<MyAppPortfolioParent> { private ItemStreamReader<TransactionDetail> delegate; private TransactionDetail curItem = null; @Override public MyAppPortfolioParent read() { MyAppPortfolioParent parent = null; try { if (curItem == null) { curItem = delegate.read(); } if (curItem != null) { parent = new MyAppPortfolioParent(); parent.setBalanceDetail(curItem); } curItem = null; if (parent != null) { parent.setTxnDetails(new ArrayList<TransactionDetail>()); TransactionDetail detail = peek(); while (detail != null && !"OPENBAL".equalsIgnoreCase(peek().getTxnCd())) { parent.getTxnDetails().add(curItem); curItem = null; detail = peek(); } } } catch (Exception e) { e.printStackTrace(); } return parent; } public TransactionDetail peek() throws Exception, UnexpectedInputException, ParseException { if (curItem == null) { curItem = delegate.read(); } return curItem; } @Override public void close() throws ItemStreamException { delegate.close(); } @Override public void open(ExecutionContext arg0) throws ItemStreamException { delegate.open(arg0); } @Override public void update(ExecutionContext arg0) throws ItemStreamException { delegate.update(arg0); } public void setDelegate(ItemStreamReader<TransactionDetail> delegate) { this.delegate = delegate; } } Step 3: The utility class that can be used to inject the step and job execution contexts into your reader, processor, or writer classes. package com.myapp.util; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.annotation.BeforeStep; import org.springframework.batch.item.ExecutionContext; public class StepExecutionListenerCtxInjecter { private ExecutionContext stepExecutionCtx; private ExecutionContext jobExecutionCtx; @BeforeStep public void beforeStep(StepExecution stepExecution) { stepExecutionCtx = stepExecution.getExecutionContext(); jobExecutionCtx = stepExecution.getJobExecution().getExecutionContext(); } public ExecutionContext getStepExecutionCtx() { return stepExecutionCtx; } public ExecutionContext getJobExecutionCtx() { return jobExecutionCtx; } } Step 4: As you could see in the spring config file that we are skipping the first record, which is the header record and defined a LineCallBackHandler to handle the header records. Here is the implementation of this handler. package com.myapp.handler; import javax.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.item.file.LineCallbackHandler; import org.springframework.stereotype.Component; import com.myapp.dao.MyAppingDao; import com.myapp.model.MyAppMeta; import com.myapp.util.CashforecastingUtil; import com.myapp.util.StepExecutionListenerCtxInjecter; @Component(value = "myAppFileHeaderLineCallbackHandler") public class MyAppFileHeaderCallbackHandler implements LineCallbackHandler { private static final Logger LOGGER = LoggerFactory.getLogger(MyAppFileHeaderCallbackHandler.class); public static final String FEED_HEADER_DATA = "feedHeaderData"; @Resource(name = "myappFeedDao") private MyAppDao myappDao; @Resource(name = "stepExecutionListener") private StepExecutionListenerCtxInjecter stepExecutionListener; @Override public void handleLine(String headerLine) { LOGGER.debug("header line: {}", headerLine); //convert CSV data into MyAppMeta cfMeta = MyAppUtil.getMyAppMetaFromHeader(headerLine, null); // logical delete current records int noOfRecordsLogicallyDeleted = myappDao.logicallyDelete(cfMeta); LOGGER.info("No of records logically deleted: " + noOfRecordsLogicallyDeleted); //save it in the job execution context stepExecutionListener.getJobExecutionCtx().put(FEED_HEADER_DATA, cfMeta); } } Step 5: The FileItemReader has a mapper defined to map each row to an object. We need to define this object that gets invoked when each CSV line item is read to convert each field to an object as shown below. package com.myapp.mapper; import java.math.BigDecimal; import java.text.ParseException; import org.apache.commons.lang.time.DateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.FieldSet; import org.springframework.validation.BindException; import com.myapp.model.MyAppDetail; public class MyAppFieldSetMapper implements FieldSetMapper<MyAppDetail> { private final static Logger logger = LoggerFactory.getLogger(CashForecastFieldSetMapper.class); @Override public MyAppDetail mapFieldSet(FieldSet fs) throws BindException { if (fs == null) { return null; } MyAppDetail detail = new MyAppDetail(); detail.setPortfolioCd(fs.readString("portfolioCd")); detail.setAccountCd(fs.readString("accountCd")); detail.setTxnCd(fs.readString("txnCd")); BigDecimal cashValue = fs.readBigDecimal("cashValue"); detail.setCashValue(cashValue != null ? cashValue : BigDecimal.ZERO); return detail; } } Step 6: The writer class that is responsible for writing a group of items (i.e. parent and children records) to the database. package com.myapp.item.writer; import java.util.List; import javax.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.item.ItemWriter; import com.myapp.dao.myappingDao; import com.myapp.handler.myappFileHeaderCallbackHandler; import com.myapp.model.myappDetail; import com.myapp.model.myappMeta; import com.myapp.model.myappParent; import com.myapp.util.StepExecutionListenerCtxInjecter; public class MyAppItemWriter implements ItemWriter<MyAppParent> { @Resource(name = "stepExecutionListener") private StepExecutionListenerCtxInjecter stepExecutionListener; // to get the step and job contexts @Resource(name = "myappFeedDao") private myappingDao myappDao; //dao class for saving records into database private final static Logger logger = LoggerFactory.getLogger(MyappItemWriter.class); @Override public void write(List portfolioDetails) { //retrieving previously stored data from the job context myappMeta pfMeta = (myappMeta) stepExecutionListener.getJobExecutionCtx().get( MyAppFileHeaderCallbackHandler.FEED_HEADER_DATA); int batchJobId = -1; //retrieving previously stored data from the job context if (stepExecutionListener.getJobExecutionCtx().get("batchJobId") != null) { batchJobId = stepExecutionListener.getJobExecutionCtx().getInt("batchJobId"); } pfMeta.setBatchJobId(batchJobId); try { for (myappParent cfp : portfolioDetails) { MyappDetail bd = cfp.getBalanceDetail(); // save cash forcasting balances int noOfRecords = myappDao.saveMyappBalance(bd, pfMeta); logger.info("No of cashforcast balance records inserted " + noOfRecords); int syntheticId = myappDao.getmyappId(bd, pfMeta); // save myapping transaction records List<Myappdetail> txnDetails = cfp.getTxnDetails(); for (myappDetail txd : txnDetails) { myappDao.saveMyappDetail(txd, syntheticId); } } } catch (Exception e) { logger.error("myappItemWriter error", e); throw new RuntimeException(e); } if (logger.isDebugEnabled()) { logger.debug("Commiting chunks to the database ...... "); } } } |
You are subscribed to email updates from 400+ Java Interview Questions and Answers blog To stop receiving these emails, you may unsubscribe now. | Email delivery powered by Google |
Google Inc., 20 West Kinzie, Chicago IL USA 60610 |
Post a Comment