Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73840984 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -438,119 +261,110 @@ public void endWindow() currentWindowRecoveryState = new MutablePair<>(); } - public int getPartitionCount() - { - return partitionCount; - } - - public void setPartitionCount(int partitionCount) + @Override + public void deactivate() { - this.partitionCount = partitionCount; + scanService.shutdownNow(); + store.disconnect(); } - @Override - public void activate(Context cntxt) + protected void pollRecords() { - if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID - && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) { - // If it is a replay state, don't start any threads here + if (isPolled) { return; } - } - - @Override - public void deactivate() - { try { - if (dbPoller != null && dbPoller.isAlive()) { - dbPoller.interrupt(); - dbPoller.join(); + ps.setFetchSize(getFetchSize()); + ResultSet result = ps.executeQuery(); + if (result.next()) { + do { + emitQueue.add(getTuple(result)); + } while (result.next()); } - } catch (InterruptedException ex) { - // log and ignore, ending execution anyway - LOG.error("exception in poller thread: ", ex); + isPolled = true; + } catch (SQLException ex) { + throw new RuntimeException(String.format("Error while running query"), ex); + } finally { + store.disconnect(); --- End diff -- This will be called even in case of no exceptions and create problems. We should let the exception reach the operator thread. Otherwise this might never get lost in the thread.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---