[
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411529#comment-15411529
]
ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------
Github user DT-Priyanka commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/358#discussion_r73841763
--- Diff:
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
---
@@ -438,119 +261,110 @@ public void endWindow()
currentWindowRecoveryState = new MutablePair<>();
}
- public int getPartitionCount()
- {
- return partitionCount;
- }
-
- public void setPartitionCount(int partitionCount)
+ @Override
+ public void deactivate()
{
- this.partitionCount = partitionCount;
+ scanService.shutdownNow();
+ store.disconnect();
}
- @Override
- public void activate(Context cntxt)
+ protected void pollRecords()
{
- if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) !=
Stateless.WINDOW_ID
- && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) <
windowManager.getLargestRecoveryWindow()) {
- // If it is a replay state, don't start any threads here
+ if (isPolled) {
return;
}
- }
-
- @Override
- public void deactivate()
- {
try {
- if (dbPoller != null && dbPoller.isAlive()) {
- dbPoller.interrupt();
- dbPoller.join();
+ ps.setFetchSize(getFetchSize());
+ ResultSet result = ps.executeQuery();
+ if (result.next()) {
+ do {
+ emitQueue.add(getTuple(result));
+ } while (result.next());
}
- } catch (InterruptedException ex) {
- // log and ignore, ending execution anyway
- LOG.error("exception in poller thread: ", ex);
+ isPolled = true;
+ } catch (SQLException ex) {
+ throw new RuntimeException(String.format("Error while running
query"), ex);
+ } finally {
+ store.disconnect();
}
- }
- @Override
- public void handleIdleTime()
- {
- if (execute) {
- try {
- Thread.sleep(spinMillis);
- } catch (InterruptedException ie) {
- throw new RuntimeException(ie);
- }
- } else {
- LOG.error("Exception: ", cause);
- DTThrowable.rethrow(cause.get());
- }
}
+ public abstract T getTuple(ResultSet result);
+
protected void replay(long windowId) throws SQLException
{
- isReplayed = true;
- MutablePair<String, String> recoveredData = new MutablePair<String,
String>();
try {
- recoveredData = (MutablePair<String,
String>)windowManager.load(operatorId, windowId);
+ MutablePair<Integer, Integer> recoveredData = (MutablePair<Integer,
Integer>)windowManager.load(operatorId,
+ windowId);
- if (recoveredData != null) {
- //skip the window and return if there was no incoming data in the
window
- if (recoveredData.left == null || recoveredData.right == null) {
- return;
- }
-
- if (recoveredData.right.equals(rangeQueryPair.getValue()) ||
recoveredData.right.equals(previousUpperBound)) {
- LOG.info("Matched so returning");
- return;
- }
+ if (recoveredData != null && shouldReplayWindow(recoveredData)) {
+ LOG.debug("[Recovering Window ID - {} for record range: {}, {}]",
windowId, recoveredData.left,
+ recoveredData.right);
- JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
- jdbcPoller.setStore(store);
- jdbcPoller.setKey(getKey());
- jdbcPoller.setPartitionCount(getPartitionCount());
- jdbcPoller.setPollInterval(getPollInterval());
- jdbcPoller.setTableName(getTableName());
- jdbcPoller.setBatchSize(getBatchSize());
- isPollable = false;
-
- LOG.debug("[Window ID -" + windowId + "," + recoveredData.left +
"," + recoveredData.right + "]");
+ ps = store.getConnection().prepareStatement(
+ buildRangeQuery(rangeQueryPair.getKey(),
(rangeQueryPair.getValue() - rangeQueryPair.getKey())),
+ TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
+ LOG.info("Query formed to recover data - {}", ps.toString());
- jdbcPoller.setRangeQueryPair(new KeyValPair<String,
String>(recoveredData.left, recoveredData.right));
+ emitReplayedTuples(ps);
- jdbcPoller.ps = jdbcPoller.store.getConnection().prepareStatement(
- JdbcMetaDataUtility.buildRangeQuery(jdbcPoller.getTableName(),
jdbcPoller.getKey(),
- jdbcPoller.getRangeQueryPair().getKey(),
jdbcPoller.getRangeQueryPair().getValue()),
- java.sql.ResultSet.TYPE_FORWARD_ONLY,
java.sql.ResultSet.CONCUR_READ_ONLY);
- LOG.info("Query formed for recovered data - {}",
jdbcPoller.ps.toString());
+ }
- emitReplayedTuples(jdbcPoller.ps);
+ if (currentWindowId == windowManager.getLargestRecoveryWindow()) {
+ try {
+ if (!isPollerPartition && rangeQueryPair.getValue() != null) {
+ ps = store.getConnection().prepareStatement(
+ buildRangeQuery(lastEmittedRecord,
(rangeQueryPair.getValue() - lastEmittedRecord)), TYPE_FORWARD_ONLY,
+ CONCUR_READ_ONLY);
+ } else {
+ Integer bound = null;
+ if (lastEmittedRecord == null) {
+ bound = rangeQueryPair.getKey();
+ } else {
+ bound = lastEmittedRecord;
+ }
+ ps =
store.getConnection().prepareStatement(buildRangeQuery(bound,
Integer.MAX_VALUE), TYPE_FORWARD_ONLY,
+ CONCUR_READ_ONLY);
+ }
+ scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval,
TimeUnit.MILLISECONDS);
--- End diff --
If we don't start threads in "activate" method we want to do it in
beginWindow once "reply" is done.
> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
> Issue Type: Improvement
> Reporter: Priyanka Gugale
> Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)