[
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411322#comment-15411322
]
ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------
Github user bhupeshchawda commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/358#discussion_r73823756
--- Diff:
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
---
@@ -80,201 +91,45 @@
* @tags database, sql, jdbc, partitionable,exactlyOnce
*/
@Evolving
-public abstract class AbstractJdbcPollInputOperator<T> extends
AbstractStoreInputOperator<T, JdbcStore>
- implements ActivationListener<Context>, IdleTimeHandler,
Partitioner<AbstractJdbcPollInputOperator<T>>
+public abstract class AbstractJdbcPollInputOperator<T> extends
AbstractStoreInputOperator<T, JdbcStore> implements
+ ActivationListener<OperatorContext>,
Partitioner<AbstractJdbcPollInputOperator<T>>
{
- /**
- * poll interval in milliseconds
- */
- private static int pollInterval = 10000;
+ private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+ private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+ private static int DEFAULT_FETCH_SIZE = 20000;
+ private static int DEFAULT_BATCH_SIZE = 2000;
+ private int pollInterval = DEFAULT_POLL_INTERVAL; //in miliseconds
+ private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+ private int fetchSize = DEFAULT_FETCH_SIZE;
@Min(1)
private int partitionCount = 1;
- protected transient int operatorId;
- protected transient boolean isReplayed;
- protected transient boolean isPollable;
- protected int batchSize;
- protected static int fetchSize = 20000;
- /**
- * Map of windowId to <lower bound,upper bound> of the range key
- */
- protected transient MutablePair<String, String>
currentWindowRecoveryState;
-
- /**
- * size of the emit queue used to hold polled records before emit
- */
- private static int queueCapacity = 4 * 1024 * 1024;
+ private int batchSize = DEFAULT_BATCH_SIZE;
+
+ @NotNull
+ private String tableName;
+ @NotNull
+ private String columnsExpression;
+ @NotNull
+ private String key;
+ private String whereCondition = null;
+ private long currentWindowId;
+ private WindowDataManager windowManager;
+
+ protected KeyValPair<Integer, Integer> rangeQueryPair;
+ protected Integer lowerBound;
+ private transient int operatorId;
+ private transient DSLContext create;
private transient volatile boolean execute;
- private transient AtomicReference<Throwable> cause;
- protected transient int spinMillis;
- private transient OperatorContext context;
- protected String tableName;
- protected String key;
- protected long currentWindowId;
- protected KeyValPair<String, String> rangeQueryPair;
- protected String lower;
- protected String upper;
- protected boolean recovered;
- protected boolean isPolled;
- protected String whereCondition = null;
- protected String previousUpperBound;
- protected String highestPolled;
- private static final String user = "";
- private static final String password = "";
- /**
- * thread to poll database
- */
- private transient Thread dbPoller;
- protected transient ArrayBlockingQueue<List<T>> emitQueue;
+ private transient ScheduledExecutorService scanService;
+ protected transient boolean isPolled;
+ protected transient Integer lastPolledBound;
+ protected transient Integer lastEmittedRecord;
--- End diff --
Can you make ```lowerBound```, ```lastPolledBound``` and
```lastEmittedRecord``` as ```int``` instead of ```Integer```
> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
> Issue Type: Improvement
> Reporter: Priyanka Gugale
> Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)