[
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15405736#comment-15405736
]
ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------
Github user bhupeshchawda commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/358#discussion_r73319997
--- Diff:
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
---
@@ -80,201 +93,48 @@
* @tags database, sql, jdbc, partitionable,exactlyOnce
*/
@Evolving
-public abstract class AbstractJdbcPollInputOperator<T> extends
AbstractStoreInputOperator<T, JdbcStore>
- implements ActivationListener<Context>, IdleTimeHandler,
Partitioner<AbstractJdbcPollInputOperator<T>>
+public abstract class AbstractJdbcPollInputOperator<T> extends
AbstractStoreInputOperator<T, JdbcStore> implements
+ ActivationListener<OperatorContext>, IdleTimeHandler,
Partitioner<AbstractJdbcPollInputOperator<T>>
{
+ private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+ private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+ private static int DEFAULT_FETCH_SIZE = 20000;
/**
* poll interval in milliseconds
*/
- private static int pollInterval = 10000;
+ private int pollInterval = DEFAULT_POLL_INTERVAL;
+ private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+ private int fetchSize = DEFAULT_FETCH_SIZE;
@Min(1)
private int partitionCount = 1;
protected transient int operatorId;
- protected transient boolean isReplayed;
- protected transient boolean isPollable;
+ protected boolean isPollerPartition;
protected int batchSize;
- protected static int fetchSize = 20000;
+
/**
* Map of windowId to <lower bound,upper bound> of the range key
*/
- protected transient MutablePair<String, String>
currentWindowRecoveryState;
+ protected transient MutablePair<Integer, Integer>
currentWindowRecoveryState;
+ private transient DSLContext create;
- /**
- * size of the emit queue used to hold polled records before emit
- */
- private static int queueCapacity = 4 * 1024 * 1024;
private transient volatile boolean execute;
private transient AtomicReference<Throwable> cause;
+ private String tableName;
+ private String columnsExpression;
--- End diff --
Can it be renamed to something like ```columnsListStr``` ?
> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
> Issue Type: Improvement
> Reporter: Priyanka Gugale
> Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)