Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73845347 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -438,119 +261,110 @@ public void endWindow() currentWindowRecoveryState = new MutablePair<>(); } - public int getPartitionCount() - { - return partitionCount; - } - - public void setPartitionCount(int partitionCount) + @Override + public void deactivate() { - this.partitionCount = partitionCount; + scanService.shutdownNow(); + store.disconnect(); } - @Override - public void activate(Context cntxt) + protected void pollRecords() { - if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID - && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) { - // If it is a replay state, don't start any threads here + if (isPolled) { return; } - } - - @Override - public void deactivate() - { try { - if (dbPoller != null && dbPoller.isAlive()) { - dbPoller.interrupt(); - dbPoller.join(); + ps.setFetchSize(getFetchSize()); + ResultSet result = ps.executeQuery(); + if (result.next()) { + do { + emitQueue.add(getTuple(result)); + } while (result.next()); } - } catch (InterruptedException ex) { - // log and ignore, ending execution anyway - LOG.error("exception in poller thread: ", ex); + isPolled = true; --- End diff -- The thread execution is not bound to window. And even if the pollInterval is short and there are multiple calls to poll record, as it's a single thread, the subsequent calls will block till this one is done. And the isPolled will be "true" for consequent calls of non-poller threads.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---