[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411374#comment-15411374
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/358#discussion_r73826686
  
    --- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
    @@ -565,67 +379,110 @@ public void emitReplayedTuples(PreparedStatement ps)
        */
       @Override
       public 
Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>>
 definePartitions(
    -      
Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>>
 partitions,
    -      com.datatorrent.api.Partitioner.PartitioningContext context)
    +      Collection<Partition<AbstractJdbcPollInputOperator<T>>> partitions, 
PartitioningContext context)
       {
         List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions = new 
ArrayList<Partition<AbstractJdbcPollInputOperator<T>>>(
             getPartitionCount());
    -    JdbcStore jdbcStore = new JdbcStore();
    -    jdbcStore.setDatabaseDriver(store.getDatabaseDriver());
    -    jdbcStore.setDatabaseUrl(store.getDatabaseUrl());
    -    jdbcStore.setConnectionProperties(store.getConnectionProperties());
     
    -    jdbcStore.connect();
    -
    -    HashMap<Integer, KeyValPair<String, String>> partitionToRangeMap = 
null;
    +    HashMap<Integer, KeyValPair<Integer, Integer>> partitionToRangeMap = 
null;
         try {
    -      partitionToRangeMap = 
JdbcMetaDataUtility.getPartitionedQueryMap(getPartitionCount(),
    -          jdbcStore.getDatabaseDriver(), jdbcStore.getDatabaseUrl(), 
getTableName(), getKey(),
    -          store.getConnectionProperties().getProperty(user), 
store.getConnectionProperties().getProperty(password),
    -          whereCondition, emitColumnList);
    +      store.connect();
    +      intializeDSLContext();
    +      partitionToRangeMap = 
getPartitionedQueryRangeMap(getPartitionCount());
         } catch (SQLException e) {
           LOG.error("Exception in initializing the partition range", e);
    +      throw new RuntimeException(e);
    +    } finally {
    +      store.disconnect();
         }
     
         KryoCloneUtils<AbstractJdbcPollInputOperator<T>> cloneUtils = 
KryoCloneUtils.createCloneUtils(this);
     
    +    // The n given partitions are for range queries and n + 1 partition is 
for polling query
         for (int i = 0; i <= getPartitionCount(); i++) {
    -      AbstractJdbcPollInputOperator<T> jdbcPoller = null;
    -
    -      jdbcPoller = cloneUtils.getClone();
    -
    -      jdbcPoller.setStore(store);
    -      jdbcPoller.setKey(getKey());
    -      jdbcPoller.setPartitionCount(getPartitionCount());
    -      jdbcPoller.setPollInterval(getPollInterval());
    -      jdbcPoller.setTableName(getTableName());
    -      jdbcPoller.setBatchSize(getBatchSize());
    -      jdbcPoller.setEmitColumnList(getEmitColumnList());
    -
    -      store.connect();
    -      //The n given partitions are for range queries and n + 1 partition 
is for polling query
    -      //The upper bound for the n+1 partition is set to null since its a 
pollable partition
    +      AbstractJdbcPollInputOperator<T> jdbcPoller = cloneUtils.getClone();
           if (i < getPartitionCount()) {
    -        jdbcPoller.setRangeQueryPair(partitionToRangeMap.get(i));
    -        isPollable = false;
    +        jdbcPoller.rangeQueryPair = partitionToRangeMap.get(i);
    +        jdbcPoller.lastEmittedRecord = partitionToRangeMap.get(i).getKey();
    --- End diff --
    
    Set ```jdbcPoller.isPollerPartition = false``` here?


> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
>                 Key: APEXMALHAR-2172
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
>             Project: Apache Apex Malhar
>          Issue Type: Improvement
>            Reporter: Priyanka Gugale
>            Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to