[ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411598#comment-15411598 ]
ASF GitHub Bot commented on APEXMALHAR-2172: -------------------------------------------- Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73848661 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -438,119 +261,110 @@ public void endWindow() currentWindowRecoveryState = new MutablePair<>(); } - public int getPartitionCount() - { - return partitionCount; - } - - public void setPartitionCount(int partitionCount) + @Override + public void deactivate() { - this.partitionCount = partitionCount; + scanService.shutdownNow(); + store.disconnect(); } - @Override - public void activate(Context cntxt) + protected void pollRecords() { - if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID - && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) { - // If it is a replay state, don't start any threads here + if (isPolled) { return; } - } - - @Override - public void deactivate() - { try { - if (dbPoller != null && dbPoller.isAlive()) { - dbPoller.interrupt(); - dbPoller.join(); + ps.setFetchSize(getFetchSize()); + ResultSet result = ps.executeQuery(); + if (result.next()) { + do { + emitQueue.add(getTuple(result)); + } while (result.next()); } - } catch (InterruptedException ex) { - // log and ignore, ending execution anyway - LOG.error("exception in poller thread: ", ex); + isPolled = true; + } catch (SQLException ex) { + throw new RuntimeException(String.format("Error while running query"), ex); + } finally { + store.disconnect(); --- End diff -- calling disconnect for non-poller threads as they are not going to connect to db again, also udpated exception handling. > Update JDBC poll input operator to fix issues > --------------------------------------------- > > Key: APEXMALHAR-2172 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172 > Project: Apache Apex Malhar > Issue Type: Improvement > Reporter: Priyanka Gugale > Assignee: Priyanka Gugale > > Update JDBCPollInputOperator to: > 1. Fix small bugs > 2. Use jooq query dsl library to construct sql queries > 3. Make code more readable > 4. Use row counts rather than key column values to partition reads -- This message was sent by Atlassian JIRA (v6.3.4#6332)