[
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15413018#comment-15413018
]
ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------
Github user bhupeshchawda commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/358#discussion_r74004081
--- Diff:
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
---
@@ -438,119 +261,110 @@ public void endWindow()
currentWindowRecoveryState = new MutablePair<>();
}
- public int getPartitionCount()
- {
- return partitionCount;
- }
-
- public void setPartitionCount(int partitionCount)
+ @Override
+ public void deactivate()
{
- this.partitionCount = partitionCount;
+ scanService.shutdownNow();
+ store.disconnect();
}
- @Override
- public void activate(Context cntxt)
+ protected void pollRecords()
{
- if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) !=
Stateless.WINDOW_ID
- && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) <
windowManager.getLargestRecoveryWindow()) {
- // If it is a replay state, don't start any threads here
+ if (isPolled) {
return;
}
- }
-
- @Override
- public void deactivate()
- {
try {
- if (dbPoller != null && dbPoller.isAlive()) {
- dbPoller.interrupt();
- dbPoller.join();
+ ps.setFetchSize(getFetchSize());
+ ResultSet result = ps.executeQuery();
+ if (result.next()) {
+ do {
+ emitQueue.add(getTuple(result));
+ } while (result.next());
}
- } catch (InterruptedException ex) {
- // log and ignore, ending execution anyway
- LOG.error("exception in poller thread: ", ex);
+ isPolled = true;
--- End diff --
What about non-poller partitions? ```pollRecords()``` is just fetching
```fetchSize``` records and making ```isPolled = true```. This way non-poller
partitions only fetch ```fetchSize``` records.
> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
> Issue Type: Improvement
> Reporter: Priyanka Gugale
> Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)