[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15405741#comment-15405741
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/358#discussion_r73320448
  
    --- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
    @@ -80,201 +93,48 @@
      * @tags database, sql, jdbc, partitionable,exactlyOnce
      */
     @Evolving
    -public abstract class AbstractJdbcPollInputOperator<T> extends 
AbstractStoreInputOperator<T, JdbcStore>
    -    implements ActivationListener<Context>, IdleTimeHandler, 
Partitioner<AbstractJdbcPollInputOperator<T>>
    +public abstract class AbstractJdbcPollInputOperator<T> extends 
AbstractStoreInputOperator<T, JdbcStore> implements
    +    ActivationListener<OperatorContext>, IdleTimeHandler, 
Partitioner<AbstractJdbcPollInputOperator<T>>
     {
    +  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
    +  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
    +  private static int DEFAULT_FETCH_SIZE = 20000;
       /**
        * poll interval in milliseconds
        */
    -  private static int pollInterval = 10000;
    +  private int pollInterval = DEFAULT_POLL_INTERVAL;
    +  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
    +  private int fetchSize = DEFAULT_FETCH_SIZE;
     
       @Min(1)
       private int partitionCount = 1;
       protected transient int operatorId;
    -  protected transient boolean isReplayed;
    -  protected transient boolean isPollable;
    +  protected boolean isPollerPartition;
       protected int batchSize;
    -  protected static int fetchSize = 20000;
    +
       /**
        * Map of windowId to <lower bound,upper bound> of the range key
        */
    -  protected transient MutablePair<String, String> 
currentWindowRecoveryState;
    +  protected transient MutablePair<Integer, Integer> 
currentWindowRecoveryState;
    +  private transient DSLContext create;
     
    -  /**
    -   * size of the emit queue used to hold polled records before emit
    -   */
    -  private static int queueCapacity = 4 * 1024 * 1024;
       private transient volatile boolean execute;
       private transient AtomicReference<Throwable> cause;
    +  private String tableName;
    +  private String columnsExpression;
    +  private String whereCondition = null;
    +  private String key;
       protected transient int spinMillis;
    -  private transient OperatorContext context;
    -  protected String tableName;
    -  protected String key;
       protected long currentWindowId;
    -  protected KeyValPair<String, String> rangeQueryPair;
    -  protected String lower;
    -  protected String upper;
    -  protected boolean recovered;
    -  protected boolean isPolled;
    -  protected String whereCondition = null;
    -  protected String previousUpperBound;
    -  protected String highestPolled;
    -  private static final String user = "";
    -  private static final String password = "";
    -  /**
    -   * thread to poll database
    -   */
    -  private transient Thread dbPoller;
    -  protected transient ArrayBlockingQueue<List<T>> emitQueue;
    +  protected KeyValPair<Integer, Integer> rangeQueryPair;
    +  protected Integer lower;
    --- End diff --
    
    Better name? ```lowerBound```?


> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
>                 Key: APEXMALHAR-2172
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
>             Project: Apache Apex Malhar
>          Issue Type: Improvement
>            Reporter: Priyanka Gugale
>            Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to