[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15414743#comment-15414743
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------

Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/358#discussion_r74187951
  
    --- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
    @@ -438,119 +261,110 @@ public void endWindow()
         currentWindowRecoveryState = new MutablePair<>();
       }
     
    -  public int getPartitionCount()
    -  {
    -    return partitionCount;
    -  }
    -
    -  public void setPartitionCount(int partitionCount)
    +  @Override
    +  public void deactivate()
       {
    -    this.partitionCount = partitionCount;
    +    scanService.shutdownNow();
    +    store.disconnect();
       }
     
    -  @Override
    -  public void activate(Context cntxt)
    +  protected void pollRecords()
       {
    -    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
    -        && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
    -      // If it is a replay state, don't start any threads here
    +    if (isPolled) {
           return;
         }
    -  }
    -
    -  @Override
    -  public void deactivate()
    -  {
         try {
    -      if (dbPoller != null && dbPoller.isAlive()) {
    -        dbPoller.interrupt();
    -        dbPoller.join();
    +      ps.setFetchSize(getFetchSize());
    +      ResultSet result = ps.executeQuery();
    +      if (result.next()) {
    +        do {
    +          emitQueue.add(getTuple(result));
    +        } while (result.next());
           }
    -    } catch (InterruptedException ex) {
    -      // log and ignore, ending execution anyway
    -      LOG.error("exception in poller thread: ", ex);
    +      isPolled = true;
    --- End diff --
    
    the fetchSize, is provided to the PreparedStatement to tell it how many 
records to fetch at a time i..e. we can say how many records to fetch as a 
page. Multiple of such pages will be fetched based on how many records are 
present. e.g. we have 1000 records and fetchSize is 100, then 10 pages of 100 
records each will be fetched.


> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
>                 Key: APEXMALHAR-2172
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
>             Project: Apache Apex Malhar
>          Issue Type: Improvement
>            Reporter: Priyanka Gugale
>            Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to