[
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15405809#comment-15405809
]
ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------
Github user bhupeshchawda commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/358#discussion_r73326772
--- Diff:
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
---
@@ -286,149 +146,119 @@ public AbstractJdbcPollInputOperator()
public void setup(OperatorContext context)
{
super.setup(context);
+ intializeDSLContext();
+ if (scanService == null) {
+ scanService = Executors.newScheduledThreadPool(partitionCount);
+ }
spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
execute = true;
cause = new AtomicReference<Throwable>();
- emitQueue = new ArrayBlockingQueue<List<T>>(queueCapacity);
- this.context = context;
+ emitQueue = new LinkedBlockingDeque<>(queueCapacity);
operatorId = context.getId();
+ windowManager.setup(context);
+ }
- try {
+ private void intializeDSLContext()
+ {
+ create = DSL.using(store.getConnection(),
JDBCUtils.dialect(store.getDatabaseUrl()));
+ }
- //If its a range query pass upper and lower bounds
- //If its a polling query pass only the lower bound
- if (getRangeQueryPair().getValue() != null) {
- ps = store.getConnection()
- .prepareStatement(
- JdbcMetaDataUtility.buildRangeQuery(getTableName(),
getKey(), rangeQueryPair.getKey(),
- rangeQueryPair.getValue()),
- java.sql.ResultSet.TYPE_FORWARD_ONLY,
java.sql.ResultSet.CONCUR_READ_ONLY);
+ @Override
+ public void activate(OperatorContext context)
+ {
+ initializePreparedStatement();
+ long largestRecoveryWindow = windowManager.getLargestRecoveryWindow();
+ if (largestRecoveryWindow == Stateless.WINDOW_ID
+ || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID)
> largestRecoveryWindow) {
+ scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval,
TimeUnit.MILLISECONDS);
+ }
+ }
+
+ protected void initializePreparedStatement()
+ {
+ try {
+ // If its a range query pass upper and lower bounds, If its a
polling query pass only the lower bound
+ if (isPollerPartition) {
+ ps =
store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(),
Integer.MAX_VALUE),
--- End diff --
Are you expecting rangeQueryPair form the user? If so, mark it
```@NotNull```. This is not being set anywhere..
> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
> Issue Type: Improvement
> Reporter: Priyanka Gugale
> Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)