400+ Java Interview Questions and Answers blog

400+ Java Interview Questions and Answers blog


Spring batch advanced tutorial -- writing your own reader

Posted: 19 Dec 2012 04:08 PM PST


My previous 3 part spring batch tutorial covered a high level overview with examples. This tutorial demonstrates how to wrap your own File Reader  with the FileItemReader to peek the data and group them the way you wanted to provide some customization. For example, if you have a CSV file like shown below where

  1. The first line is the header with the portfolio name, transaction from date and transaction to date.
  2. The remaining rows are transaction detail records grouped by account code. The detail records contain portfolio code, account code, transaction type, and transaction amount.
  3. When you read the records we need to read them by account grouping and process them. in other words, we have 2 groups in the csv feed shown below. The records 2- 5 is one group, and records 6 -8 is another group. In other words starting from the transaction type "OPENBAL" to the record before the next "OPENBAL" is one group


"Portfolio1","29/02/2012","11/03/2012",  "Portfolio1","Account1","OPENBAL", 2000.00  "Portfolio1  ","Account1","PURCHASE",1000.00  "Portfolio1  ","Account1","EXPENSE",500.00  "Portfolio1  ","Account1","ADJUSTMENT ", 200.00  "Portfolio1","Account1","OPENBAL ", 12000.00  "Portfolio1  ","Account2","PURCHASE",1000.00  "Portfolio1  ","Account3","ADJUSTMENT",1000.00  


So, wee need to write a custom file reader that can peek into the next record before reading it.

Step 1: Snippets of the spring batch context configuration file.E..g.applicationContext-myapp.xml.

<?xml version="1.0" encoding="UTF-8"?>  <beans xmlns="http://www.springframework.org/schema/beans"   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"   xmlns:task="http://www.springframework.org/schema/task" xmlns:context="http://www.springframework.org/schema/context"   xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"   xmlns:int="http://www.springframework.org/schema/integration"   xmlns:file="http://www.springframework.org/schema/integration/file"   xmlns:util="http://www.springframework.org/schema/util"   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd   http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd   http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd   http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd   http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd   http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file-2.0.xsd   http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.0.xsd   http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">         <!-- load properties file-->   <context:property-placeholder location="classpath:myapp.properties" />   <!-- annotation driven injection -->   <tx:annotation-driven />     <!-- define the job that reads from a CSV file and write to a database-->   <job id="myAppJob" xmlns="http://www.springframework.org/schema/batch">    <listeners>     <listener ref="myAppJobExecutionListener" />    </listeners>      <step id="loadMyAppFeedData">     <tasklet transaction-manager="transactionManager">      <listeners>       <listener ref="stepExecutionListener" />      </listeners>        <chunk reader="groupMyAppDetailsReader" writer="myAppFileItemWriter" commit-interval="10" />     </tasklet>    </step>       </job>           <!-- Spring supplied File Item Reader that reads CSV file line by line-->    <bean id="myAppFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">    <property name="resource" value="#{jobParameters['dataFileName']}" />    <property name="lineMapper">     <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">      <property name="lineTokenizer">       <bean        class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">        <property name="names"         value="portfolioCd,accountCd,transactionType, Amount" />       </bean>      </property>      <property name="fieldSetMapper">       <bean        class="com.myapp.mapper.MyAppFieldSetMapper" />      </property>     </bean>    </property>    <property name="linesToSkip" value="1" />    <property name="skippedLinesCallback" ref="myAppFileHeaderLineCallbackHandler" />   </bean>        <!-- My custom CSV file Reader that groups data but it internally makes use of the Spring's FileItemReader-->   <bean id="groupMyAppDetailsReader"  class="com.myapp.item.reader.myAppItemReader">    <property name="delegate" ref="myAppFileItemReader" />   </bean>     <!-- My custome File Item Writer -->   <bean id="myAppFileItemWriter" class="com.myapp.item.writer.MyAppItemWriter" />        <!-- The Step execution context listener that can be injected to propagate step values -->   <bean id="stepExecutionListener" class="com.myapp.StepExecutionListenerCtxInjecter" />      </beans>  


Step 2: The custom reader can be implemented as shown below. The key here is that  peeking the next record to enable grouping and making use of the Spring provided FileItemReader as a delegate to read each CSV line.

package com.myapp.item.reader;    import java.util.ArrayList;    import org.springframework.batch.item.ExecutionContext;  import org.springframework.batch.item.ItemStreamException;  import org.springframework.batch.item.ItemStreamReader;  import org.springframework.batch.item.ParseException;  import org.springframework.batch.item.UnexpectedInputException;    import com.myapp.model.TransactionDetail;  import com.myapp.model.MyAppPortfolioParent;    public class MyAppFileItemReader implements ItemStreamReader<MyAppPortfolioParent> {     private ItemStreamReader<TransactionDetail> delegate;   private TransactionDetail curItem = null;     @Override   public MyAppPortfolioParent read() {    MyAppPortfolioParent parent = null;    try {       if (curItem == null) {      curItem = delegate.read();     }       if (curItem != null) {      parent = new MyAppPortfolioParent();      parent.setBalanceDetail(curItem);     }       curItem = null;       if (parent != null) {      parent.setTxnDetails(new ArrayList<TransactionDetail>());      TransactionDetail detail = peek();      while (detail != null && !"OPENBAL".equalsIgnoreCase(peek().getTxnCd())) {       parent.getTxnDetails().add(curItem);       curItem = null;       detail = peek();      }     }      }      catch (Exception e) {     e.printStackTrace();    }      return parent;   }     public TransactionDetail peek() throws Exception, UnexpectedInputException, ParseException {    if (curItem == null) {     curItem = delegate.read();    }      return curItem;   }     @Override   public void close() throws ItemStreamException {    delegate.close();   }     @Override   public void open(ExecutionContext arg0) throws ItemStreamException {    delegate.open(arg0);   }     @Override   public void update(ExecutionContext arg0) throws ItemStreamException {    delegate.update(arg0);     }     public void setDelegate(ItemStreamReader<TransactionDetail> delegate) {    this.delegate = delegate;   }  }    


Step 3: The utility class that can be used to inject the step and job execution contexts into your reader, processor, or writer classes.

package com.myapp.util;    import org.springframework.batch.core.StepExecution;  import org.springframework.batch.core.annotation.BeforeStep;  import org.springframework.batch.item.ExecutionContext;      public class StepExecutionListenerCtxInjecter  {      private ExecutionContext stepExecutionCtx;            private ExecutionContext jobExecutionCtx;            @BeforeStep      public void beforeStep(StepExecution stepExecution)      {          stepExecutionCtx = stepExecution.getExecutionContext();          jobExecutionCtx = stepExecution.getJobExecution().getExecutionContext();      }          public ExecutionContext getStepExecutionCtx()      {          return stepExecutionCtx;      }        public ExecutionContext getJobExecutionCtx()      {          return jobExecutionCtx;      }  }    

Step 4: As you could see in the spring config file that we are skipping the first record, which is the header record and defined a LineCallBackHandler to handle the header records. Here is the implementation of this handler.

package com.myapp.handler;    import javax.annotation.Resource;    import org.slf4j.Logger;  import org.slf4j.LoggerFactory;  import org.springframework.batch.item.file.LineCallbackHandler;  import org.springframework.stereotype.Component;    import com.myapp.dao.MyAppingDao;  import com.myapp.model.MyAppMeta;  import com.myapp.util.CashforecastingUtil;  import com.myapp.util.StepExecutionListenerCtxInjecter;    @Component(value = "myAppFileHeaderLineCallbackHandler")  public class MyAppFileHeaderCallbackHandler implements LineCallbackHandler {     private static final Logger LOGGER = LoggerFactory.getLogger(MyAppFileHeaderCallbackHandler.class);     public static final String FEED_HEADER_DATA = "feedHeaderData";     @Resource(name = "myappFeedDao")   private MyAppDao myappDao;     @Resource(name = "stepExecutionListener")   private StepExecutionListenerCtxInjecter stepExecutionListener;     @Override   public void handleLine(String headerLine) {    LOGGER.debug("header line: {}", headerLine);    //convert CSV data into     MyAppMeta cfMeta = MyAppUtil.getMyAppMetaFromHeader(headerLine, null);      // logical delete current records    int noOfRecordsLogicallyDeleted = myappDao.logicallyDelete(cfMeta);    LOGGER.info("No of records logically deleted: " + noOfRecordsLogicallyDeleted);      //save it in the job execution context    stepExecutionListener.getJobExecutionCtx().put(FEED_HEADER_DATA, cfMeta);   }  }    


Step 5: The FileItemReader has a mapper defined to map each row to an object. We need to define this object that gets invoked when each CSV line item is read to convert each field to an object as shown below.

package com.myapp.mapper;
  import java.math.BigDecimal;  import java.text.ParseException;    import org.apache.commons.lang.time.DateUtils;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory;  import org.springframework.batch.item.file.mapping.FieldSetMapper;  import org.springframework.batch.item.file.transform.FieldSet;  import org.springframework.validation.BindException;    import com.myapp.model.MyAppDetail;      public class MyAppFieldSetMapper implements FieldSetMapper<MyAppDetail> {      private final static Logger logger = LoggerFactory.getLogger(CashForecastFieldSetMapper.class);         @Override   public MyAppDetail mapFieldSet(FieldSet fs) throws BindException {      if (fs == null) {     return null;    }        MyAppDetail detail = new MyAppDetail();    detail.setPortfolioCd(fs.readString("portfolioCd"));    detail.setAccountCd(fs.readString("accountCd"));    detail.setTxnCd(fs.readString("txnCd"));        BigDecimal cashValue = fs.readBigDecimal("cashValue");    detail.setCashValue(cashValue != null ? cashValue : BigDecimal.ZERO);            return detail;     }  }    


Step 6: The writer class that is responsible for writing a group of items (i.e. parent and children records)  to the database. 

package com.myapp.item.writer;    import java.util.List;    import javax.annotation.Resource;    import org.slf4j.Logger;  import org.slf4j.LoggerFactory;  import org.springframework.batch.item.ItemWriter;    import com.myapp.dao.myappingDao;  import com.myapp.handler.myappFileHeaderCallbackHandler;  import com.myapp.model.myappDetail;  import com.myapp.model.myappMeta;  import com.myapp.model.myappParent;  import com.myapp.util.StepExecutionListenerCtxInjecter;    public class MyAppItemWriter implements ItemWriter<MyAppParent> {     @Resource(name = "stepExecutionListener")   private StepExecutionListenerCtxInjecter stepExecutionListener; // to get the step and job contexts     @Resource(name = "myappFeedDao")   private myappingDao myappDao;   //dao class for saving records into database     private final static Logger logger = LoggerFactory.getLogger(MyappItemWriter.class);     @Override   public void write(List portfolioDetails) {         //retrieving previously stored data from the job context    myappMeta pfMeta = (myappMeta) stepExecutionListener.getJobExecutionCtx().get(      MyAppFileHeaderCallbackHandler.FEED_HEADER_DATA);      int batchJobId = -1;       //retrieving previously stored data from the job context    if (stepExecutionListener.getJobExecutionCtx().get("batchJobId") != null) {     batchJobId = stepExecutionListener.getJobExecutionCtx().getInt("batchJobId");    }    pfMeta.setBatchJobId(batchJobId);      try {     for (myappParent cfp : portfolioDetails) {        MyappDetail bd = cfp.getBalanceDetail();        // save cash forcasting balances      int noOfRecords = myappDao.saveMyappBalance(bd, pfMeta);      logger.info("No of cashforcast balance records inserted " + noOfRecords);        int syntheticId = myappDao.getmyappId(bd, pfMeta);        // save myapping transaction records      List<Myappdetail> txnDetails = cfp.getTxnDetails();      for (myappDetail txd : txnDetails) {       myappDao.saveMyappDetail(txd, syntheticId);      }       }      } catch (Exception e) {     logger.error("myappItemWriter error", e);     throw new RuntimeException(e);    }      if (logger.isDebugEnabled()) {     logger.debug("Commiting chunks to the database ...... ");    }     }  }    




Post a Comment