[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411512#comment-15411512
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/358#discussion_r73840984
  
    --- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
    @@ -438,119 +261,110 @@ public void endWindow()
         currentWindowRecoveryState = new MutablePair<>();
       }
     
    -  public int getPartitionCount()
    -  {
    -    return partitionCount;
    -  }
    -
    -  public void setPartitionCount(int partitionCount)
    +  @Override
    +  public void deactivate()
       {
    -    this.partitionCount = partitionCount;
    +    scanService.shutdownNow();
    +    store.disconnect();
       }
     
    -  @Override
    -  public void activate(Context cntxt)
    +  protected void pollRecords()
       {
    -    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
    -        && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
    -      // If it is a replay state, don't start any threads here
    +    if (isPolled) {
           return;
         }
    -  }
    -
    -  @Override
    -  public void deactivate()
    -  {
         try {
    -      if (dbPoller != null && dbPoller.isAlive()) {
    -        dbPoller.interrupt();
    -        dbPoller.join();
    +      ps.setFetchSize(getFetchSize());
    +      ResultSet result = ps.executeQuery();
    +      if (result.next()) {
    +        do {
    +          emitQueue.add(getTuple(result));
    +        } while (result.next());
           }
    -    } catch (InterruptedException ex) {
    -      // log and ignore, ending execution anyway
    -      LOG.error("exception in poller thread: ", ex);
    +      isPolled = true;
    +    } catch (SQLException ex) {
    +      throw new RuntimeException(String.format("Error while running 
query"), ex);
    +    } finally {
    +      store.disconnect();
    --- End diff --
    
    This will be called even in case of no exceptions and create problems.
    We should let the exception reach the operator thread. Otherwise this might 
never get lost in the thread.


> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
>                 Key: APEXMALHAR-2172
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
>             Project: Apache Apex Malhar
>          Issue Type: Improvement
>            Reporter: Priyanka Gugale
>            Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to