[
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411425#comment-15411425
]
ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------
Github user bhupeshchawda commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/358#discussion_r73830019
--- Diff:
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
---
@@ -438,119 +261,110 @@ public void endWindow()
currentWindowRecoveryState = new MutablePair<>();
}
- public int getPartitionCount()
- {
- return partitionCount;
- }
-
- public void setPartitionCount(int partitionCount)
+ @Override
+ public void deactivate()
{
- this.partitionCount = partitionCount;
+ scanService.shutdownNow();
+ store.disconnect();
}
- @Override
- public void activate(Context cntxt)
+ protected void pollRecords()
{
- if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) !=
Stateless.WINDOW_ID
- && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) <
windowManager.getLargestRecoveryWindow()) {
- // If it is a replay state, don't start any threads here
+ if (isPolled) {
return;
}
- }
-
- @Override
- public void deactivate()
- {
try {
- if (dbPoller != null && dbPoller.isAlive()) {
- dbPoller.interrupt();
- dbPoller.join();
+ ps.setFetchSize(getFetchSize());
+ ResultSet result = ps.executeQuery();
+ if (result.next()) {
+ do {
+ emitQueue.add(getTuple(result));
+ } while (result.next());
}
- } catch (InterruptedException ex) {
- // log and ignore, ending execution anyway
- LOG.error("exception in poller thread: ", ex);
+ isPolled = true;
+ } catch (SQLException ex) {
+ throw new RuntimeException(String.format("Error while running
query"), ex);
+ } finally {
+ store.disconnect();
}
- }
- @Override
- public void handleIdleTime()
- {
- if (execute) {
- try {
- Thread.sleep(spinMillis);
- } catch (InterruptedException ie) {
- throw new RuntimeException(ie);
- }
- } else {
- LOG.error("Exception: ", cause);
- DTThrowable.rethrow(cause.get());
- }
}
+ public abstract T getTuple(ResultSet result);
+
protected void replay(long windowId) throws SQLException
{
- isReplayed = true;
- MutablePair<String, String> recoveredData = new MutablePair<String,
String>();
try {
- recoveredData = (MutablePair<String,
String>)windowManager.load(operatorId, windowId);
+ MutablePair<Integer, Integer> recoveredData = (MutablePair<Integer,
Integer>)windowManager.load(operatorId,
+ windowId);
- if (recoveredData != null) {
- //skip the window and return if there was no incoming data in the
window
- if (recoveredData.left == null || recoveredData.right == null) {
- return;
- }
-
- if (recoveredData.right.equals(rangeQueryPair.getValue()) ||
recoveredData.right.equals(previousUpperBound)) {
- LOG.info("Matched so returning");
- return;
- }
+ if (recoveredData != null && shouldReplayWindow(recoveredData)) {
+ LOG.debug("[Recovering Window ID - {} for record range: {}, {}]",
windowId, recoveredData.left,
+ recoveredData.right);
- JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
- jdbcPoller.setStore(store);
- jdbcPoller.setKey(getKey());
- jdbcPoller.setPartitionCount(getPartitionCount());
- jdbcPoller.setPollInterval(getPollInterval());
- jdbcPoller.setTableName(getTableName());
- jdbcPoller.setBatchSize(getBatchSize());
- isPollable = false;
-
- LOG.debug("[Window ID -" + windowId + "," + recoveredData.left +
"," + recoveredData.right + "]");
+ ps = store.getConnection().prepareStatement(
+ buildRangeQuery(rangeQueryPair.getKey(),
(rangeQueryPair.getValue() - rangeQueryPair.getKey())),
--- End diff --
Should this fetch data from ```recoveredData.left``` to
```recoveredData.right```?
> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
> Issue Type: Improvement
> Reporter: Priyanka Gugale
> Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)