Hi,

I've asked for closure that passed to CacheStore.loadCache(closure, ..)
method.


On Fri, Feb 24, 2017 at 9:31 PM, diopek <deha.pe...@gmail.com> wrote:

> package myignite.loading.test.cache.store;
>
> import static myignite.loading.test.common.CommonUtils.CONFIG_DIR;
> import static myignite.loading.test.common.CommonUtils.DATA_SRC_PWD;
> import static myignite.loading.test.common.CommonUtils.DATA_SRC_URL;
> import static myignite.loading.test.common.CommonUtils.DATA_SRC_USR;
> import static myignite.loading.test.common.CommonUtils.RWA_SQL_FETCH_SIZE;
> import static
> myignite.loading.test.common.CommonUtils.SQL_FETCH_SIZE_DEFAULT;
> import static
> myignite.loading.test.common.CommonUtils.TREAS_LIQUIDITY_CLASS_UNDEFINED;
> import static myignite.loading.test.common.CommonUtils.REPLENISH;
> import static myignite.loading.test.common.CommonUtils.EXISTING;
> import static myignite.loading.test.common.CommonUtils.stopWatchEnd;
> import static myignite.loading.test.common.CommonUtils.stopWatchStart;
> import org.jooq.lambda.tuple.Tuple2;
>
> import java.io.File;
> import java.io.FileInputStream;
> import java.io.IOException;
> import java.io.InputStream;
> import java.sql.ResultSet;
> import java.sql.SQLException;
> import java.util.ArrayList;
> import java.util.Date;
> import java.util.Iterator;
> import java.util.Properties;
> import java.util.concurrent.atomic.AtomicLong;
>
> import javax.cache.integration.CacheLoaderException;
>
> import org.apache.ignite.cache.store.CacheLoadOnlyStoreAdapter;
> import org.apache.ignite.internal.util.typedef.T2;
> import org.apache.ignite.lang.IgniteBiTuple;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import org.springframework.dao.DataAccessException;
> import org.springframework.jdbc.core.JdbcTemplate;
> import org.springframework.jdbc.core.ResultSetExtractor;
> import org.springframework.jdbc.datasource.SingleConnectionDataSource;
> import org.springframework.util.Assert;
> import org.springframework.util.StopWatch;
>
> import myignite.loading.test.domain.MyDTO;
>
> public class ExistingOrReplenishCacheLoadOnlyStore3
>                 extends CacheLoadOnlyStoreAdapter<Long,
> ArrayList&lt;MyDTO>,
> Tuple2<Long,ArrayList&lt;MyDTO>>> {
>
>         private final static Logger logger =
> LoggerFactory.getLogger(ExistingOrReplenishCacheLoadOnlyStore3.class);
>
>         private static int SQL_FETCH_SIZE = SQL_FETCH_SIZE_DEFAULT;
>         private static String dataSourceUrl;
>         private static String dbUser;
>         private static String dbPwd;
>
>         private SingleConnectionDataSource DATA_SRC;
>
>         static {
>                 String configDir = System.getProperty(CONFIG_DIR);
>                 Assert.notNull(configDir, "config.dir should be passed as
> JVM
> arguments...");
>                 StringBuffer filePath = new StringBuffer(configDir);
>                 filePath.append(File.separatorChar).append("rwa-
> batch.properties");
>                 Properties props = new Properties();
>                 // InputStream inputStream =
>                 //
> ExistingCacheStore.class.getClassLoader().getResourceAsStream(configDir);
>                 try {
>                         InputStream inputStream = new
> FileInputStream(filePath.toString());
>                         Assert.notNull(inputStream,
>                                         "FileNotFoundException - property
> file '" + filePath + "' not found in
> file system");
>                         props.load(inputStream);
>                 } catch (IOException e) {
>                         e.printStackTrace();
>                 }
>                 dataSourceUrl = props.getProperty(DATA_SRC_URL);
>                 System.out.println(">>>dataSourceUrl::" + dataSourceUrl);
>                 Assert.notNull(dataSourceUrl, "'rwa.jdbc.url' should be
> provided in
> rwa-batch.properties...");
>                 dbUser = props.getProperty(DATA_SRC_USR);
>                 Assert.notNull(dbUser, "'rwa.jdbc.usr' should be provided
> in
> rwa-batch.properties...");
>                 dbPwd = System.getProperty(DATA_SRC_PWD);
>                 Assert.notNull(dbPwd, "'rwa.jdbc.pwd' should be provided in
> rwa-batch.properties...");
>
>                 String fetchSize = props.getProperty(RWA_SQL_FETCH_SIZE);
>                 if (fetchSize != null) {
>                         SQL_FETCH_SIZE = Integer.valueOf(fetchSize);
>                 }
>         }
>
>         private JdbcTemplate jdbcTemplate;
>
>         public ExistingOrReplenishCacheLoadOnlyStore3() {
>                 super();
>                 DATA_SRC = new SingleConnectionDataSource();
>                 DATA_SRC.setDriverClassName("oracle.jdbc.driver.
> OracleDriver");
>                 DATA_SRC.setUrl(dataSourceUrl);
>                 DATA_SRC.setUsername(dbUser);
>                 DATA_SRC.setPassword(dbPwd);
>                 jdbcTemplate = new JdbcTemplate(DATA_SRC);
>         }
>
>         @Override
>         protected Iterator<Tuple2&lt;Long,ArrayList&lt;MyDTO>>>
> inputIterator(Object... args) throws CacheLoaderException {
>                 if (args == null || args.length < 6)
>                         throw new CacheLoaderException(
>                                         "Expected asOf, scenId,
> HierarchyServiceProxy and replenish parameters
> are not fully provided...");
>                 try {
>                         final Date asOf = (Date) args[0];
>                         final String datasetId = (String) args[1];
>                         final Integer scenId = (Integer) args[2];
>                         final String sql = (String) args[3];
>                         final Boolean replenishFlag = (Boolean) args[4];
>                         Integer startSize =(Integer) args[5];
>                     logger.debug("AS_OF::{} DATASET_ID::{} SCEN_ID::{}
> REP_FLAG::{}
> START_SIZE::{}", asOf, datasetId, scenId,
>                                 replenishFlag, startSize);
>
>                         logger.debug("load{}Cache::SQL::{}",
> (replenishFlag ? "Replenish" :
> "Existing"), sql);
>                         ArrayList<Tuple2&lt;Long,ArrayList&lt;MyDTO>>>
> extOrRepList = null;
> //                      Iterator<Entry&lt;Integer, ArrayList&lt;MyDTO>>>
> iterator = null;
> //                      Iterator<ArrayList&lt;MyDTO>> iterator = null;
>                         Iterator<Tuple2&lt;Long,ArrayList&lt;MyDTO>>>
> iterator = null;
>
> //                      ResultSetExtractor<LinkedHashMap&lt;Integer,
> ArrayList&lt;MyDTO>>>
> extOrRepMapResultSetExtractor = new
> ResultSetExtractor<LinkedHashMap&lt;Integer, ArrayList&lt;MyDTO>>>() {
>                         ResultSetExtractor<ArrayList&
> lt;Tuple2&lt;Long,ArrayList&lt;MyDTO>>>>
> extOrRepMapResultSetExtractor = new
> ResultSetExtractor<ArrayList&lt;Tuple2&lt;Long,ArrayList&lt;MyDTO>>>>() {
>                                 @Override
> //                              public LinkedHashMap<Integer,
> ArrayList&lt;MyDTO>>
> extractData(ResultSet rs)
>                                 public ArrayList<Tuple2&lt;Long,
> ArrayList&lt;MyDTO>>>
> extractData(ResultSet rs)
>                                                 throws SQLException,
> DataAccessException {
>
> //                                      LinkedHashMap<Integer,
> ArrayList&lt;MyDTO>> extOrRepMap = new
> LinkedHashMap<Integer, ArrayList&lt;MyDTO>>(
> //                                                      gockeyCnt, 1.0f);
> //                                      ArrayList<ArrayList&lt;MyDTO>>
> extOrRepList = new
> ArrayList<ArrayList&lt;MyDTO>>(
> //                                                      gockeyCnt);
>                                         
> ArrayList<Tuple2&lt;Long,ArrayList&lt;MyDTO>>>
> extOrRepList = new
> ArrayList<Tuple2&lt;Long,ArrayList&lt;MyDTO>>>(startSize);
>                                         String prevGoc = null, prevAcct =
> null, prevSac = null, prevCcy = null;
>                                         Integer prevFpId = null;
>                                         ArrayList<MyDTO> currDTOList =
> null, prevDTOList = null;
>                                         MyDTO dto = null, prevDto = null;
> //                                      final AtomicInteger entryCnt = new
> AtomicInteger(0);
>                                         while (rs.next()) {
>                                                 int i = 1;
>                                                 dto = new MyDTO();
>                                                 dto.setAsOf(asOf);
>                                                 dto.setDatasetId(Integer.
> valueOf(datasetId));
>                                                 dto.setScnId(scenId);
>
>
> dto.setGoc(rs.getString(i++));
>
> dto.setAcct(rs.getString(i++));
>                                                 dto.setSumAffilCode(rs.
> getString(i++));
>
> dto.setCcyCode(rs.getString(i++));
>
> dto.setFrcstProdId(rs.getInt(i++));
>
> dto.setMngSeg(rs.getString(i++));
>
> dto.setMngGeo(rs.getString(i++));
>
> dto.setFrsBu(rs.getString(i++));
>                                                 if (replenishFlag) {
>
> dto.setReplenishFlag(REPLENISH);
>                                                 } else {
>
> dto.setRwaExposureType(rs.getString(i++));
>
> dto.setRiskAssetClass(rs.getString(i++));
>
> dto.setRiskSubAssetClass(rs.getString(i++));
>                                                         String
> treasLiqClass = rs.getString(i++);
>
> dto.setTreasLiqClass((treasLiqClass == null ?
> TREAS_LIQUIDITY_CLASS_UNDEFINED :treasLiqClass));
>
> dto.setCounterpartyRating(rs.getString(i++));
>
> dto.setClearedStatus(rs.getString(i++));
>
> dto.setMaturityBand(rs.getString(i++));
>
> dto.setDerivativeType(rs.getString(i++));
>
> dto.setReplenishFlag(EXISTING);
>                                                 }
>
> dto.setStartDate(rs.getDate(i++));
>                                                 dto.setMaturityDate(rs.
> getDate(i++));
>
> dto.setAmount(rs.getDouble(i++));
>                                                 if(!replenishFlag) {
>
> dto.setEtlsource(rs.getString(i++));
>                                                 }else {
>
> dto.setInvestmentId(rs.getString(i++));
>                                                 }
>                                                 if
> (dto.getGoc().equals(prevGoc) && dto.getAcct().equals(prevAcct)
>                                                                 &&
> dto.getSumAffilCode().equals(prevSac) &&
> dto.getCcyCode().equals(prevCcy)
>                                                                 &&
> dto.getFrcstProdId().equals(prevFpId)) {
>
> prevDTOList.add(prevDto);
>                                                 } else {
>                                                         if (prevDto !=
> null) {
>
> prevDTOList.add(prevDto);
> //
> extOrRepMap.put(entryCnt.incrementAndGet(), prevDTOList);
>
> extOrRepList.add(new Tuple2<Long,
> ArrayList&lt;MyDTO>>(entryCnt.incrementAndGet(),prevDTOList));
>                                                         }
>                                                         currDTOList = new
> ArrayList<MyDTO>();
>                                                 }
>                                                 prevDto = dto;
>                                                 prevDTOList = currDTOList;
>                                                 prevGoc = dto.getGoc();
>                                                 prevAcct = dto.getAcct();
>                                                 prevSac =
> dto.getSumAffilCode();
>                                                 prevCcy = dto.getCcyCode();
>                                                 prevFpId =
> dto.getFrcstProdId();
>                                         }
>                                         if (prevDto != null) {
>                                                 prevDTOList.add(prevDto);
> //                                              
> extOrRepMap.put(entryCnt.incrementAndGet(),
> prevDTOList);
>                                                 extOrRepList.add(new
> Tuple2<Long,
> ArrayList&lt;MyDTO>>(entryCnt.incrementAndGet(),prevDTOList));
>                                         }
> //                                      return extOrRepMap;
>                                         return extOrRepList;
>                                 }
>
>                         };
>
>                         jdbcTemplate.setFetchSize(SQL_FETCH_SIZE);
>                         StopWatch sw = new StopWatch();
>                         stopWatchStart(sw, "populatingDataMap");
>                         logger.debug("BEFORE populatingDataMap_STARTS!!!!")
> ;
> //                      extOrRepMap = jdbcTemplate.query(sql,
> extOrRepMapResultSetExtractor);
>                         extOrRepList = jdbcTemplate.query(sql,
> extOrRepMapResultSetExtractor);
>                         logger.debug("BEFORE populatingDataMap_ENDS!!!!");
>                         stopWatchEnd(sw);
>
>                         if (extOrRepList != null) {
> //                              iterator = extOrRepMap.entrySet().
> iterator();
>                                 /*
>                                  * RECORDS COUNT PRINTED CORRECTLY HERE
> BEFORE PASSING TO IGNITE
>                                  */
>                                 logger.debug("+++++++ GOC_KEY
> COUNT::{}",extOrRepList.size());
>                                 iterator = extOrRepList.iterator();
>                         }
>                         return iterator;
>                 } finally {
>                         DATA_SRC.destroy();
>                 }
>         }
>
>         @Override
>         protected IgniteBiTuple<Long, ArrayList&lt;MyDTO>>
> parse(Tuple2<Long,ArrayList&lt;MyDTO>> rec, Object... args) {
>                 return new T2<>(rec.v1(), rec.v2());
>         }
>
> }
>
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/Missing-records-Ignite-cache-size-
> grows-tp10809p10871.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>



-- 
Best regards,
Andrey V. Mashenkov

Reply via email to