[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-07-29 Thread DT-Priyanka
GitHub user DT-Priyanka opened a pull request:

https://github.com/apache/apex-malhar/pull/358

[review only]APEXMALHAR-2172: Updates to JDBC Poll Input Operator

Changes include:
1. Using jooq query to construct sql queries
2. Update logic to query based on row numbers instead of primary key column 
values
3. Small fixes
4. Code refactoring to make it more readable
5. Update operator to emit pojo class constructed from table input fields
6. Update to test cases, to abstract out generic test code and add more 
tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/DT-Priyanka/incubator-apex-malhar 
APEXMALHAR-2172-jdbc-poller-input

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/358.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #358


commit 64cfd4767c2411ffbc87f5d87d413c888c935bad
Author: Priyanka Gugale 
Date:   2016-07-19T06:40:08Z

APEXMALHAR-2172: Updates to JDBC Poll Input Operator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-07-30 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r72892353
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -27,18 +27,49 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.validation.constraints.Min;
 
+import org.jooq.Condition;
+import org.jooq.DSLContext;
+import org.jooq.Field;
+import org.jooq.SelectField;
+import org.jooq.conf.ParamType;
+import org.jooq.impl.DSL;
+import org.jooq.tools.jdbc.JDBCUtils;
+import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
 import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 
+/**
--- End diff --

Multiple file headers and import statements mixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-01 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r72958642
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +93,48 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, IdleTimeHandler, 
Partitioner>
 {
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 2;
   /**
* poll interval in milliseconds
*/
-  private static int pollInterval = 1;
+  private int pollInterval = DEFAULT_POLL_INTERVAL;
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
   protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
+  protected boolean isPollerPartition;
   protected int batchSize;
-  protected static int fetchSize = 2;
+
   /**
* Map of windowId to  of the range key
*/
-  protected transient MutablePair 
currentWindowRecoveryState;
+  protected transient MutablePair 
currentWindowRecoveryState;
+  private transient DSLContext create;
 
-  /**
-   * size of the emit queue used to hold polled records before emit
-   */
-  private static int queueCapacity = 4 * 1024 * 1024;
   private transient volatile boolean execute;
   private transient AtomicReference cause;
+  private String tableName;
+  private String columnsExpression;
+  private String whereCondition = null;
+  private String key;
   protected transient int spinMillis;
-  private transient OperatorContext context;
-  protected String tableName;
-  protected String key;
   protected long currentWindowId;
-  protected KeyValPair rangeQueryPair;
-  protected String lower;
-  protected String upper;
-  protected boolean recovered;
-  protected boolean isPolled;
-  protected String whereCondition = null;
-  protected String previousUpperBound;
-  protected String highestPolled;
-  private static final String user = "";
-  private static final String password = "";
-  /**
-   * thread to poll database
-   */
-  private transient Thread dbPoller;
-  protected transient ArrayBlockingQueue> emitQueue;
+  protected KeyValPair rangeQueryPair;
+  protected Integer lower;
+  protected transient boolean isPolled;
+  protected transient Integer lastPolledBound;
+  protected transient Integer lastEmittedRecord;
--- End diff --

Can lower, lastPolledBound and lastEmittedRecord be int instead of Integer?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-03 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73296580
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +93,48 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, IdleTimeHandler, 
Partitioner>
 {
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 2;
   /**
* poll interval in milliseconds
*/
-  private static int pollInterval = 1;
+  private int pollInterval = DEFAULT_POLL_INTERVAL;
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
   protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
+  protected boolean isPollerPartition;
   protected int batchSize;
-  protected static int fetchSize = 2;
+
   /**
* Map of windowId to  of the range key
*/
-  protected transient MutablePair 
currentWindowRecoveryState;
+  protected transient MutablePair 
currentWindowRecoveryState;
+  private transient DSLContext create;
 
-  /**
-   * size of the emit queue used to hold polled records before emit
-   */
-  private static int queueCapacity = 4 * 1024 * 1024;
   private transient volatile boolean execute;
   private transient AtomicReference cause;
+  private String tableName;
+  private String columnsExpression;
+  private String whereCondition = null;
+  private String key;
   protected transient int spinMillis;
-  private transient OperatorContext context;
-  protected String tableName;
-  protected String key;
   protected long currentWindowId;
-  protected KeyValPair rangeQueryPair;
-  protected String lower;
-  protected String upper;
-  protected boolean recovered;
-  protected boolean isPolled;
-  protected String whereCondition = null;
-  protected String previousUpperBound;
-  protected String highestPolled;
-  private static final String user = "";
-  private static final String password = "";
-  /**
-   * thread to poll database
-   */
-  private transient Thread dbPoller;
-  protected transient ArrayBlockingQueue> emitQueue;
+  protected KeyValPair rangeQueryPair;
+  protected Integer lower;
+  protected transient boolean isPolled;
+  protected transient Integer lastPolledBound;
+  protected transient Integer lastEmittedRecord;
+  protected transient ScheduledExecutorService scanService;
+  protected transient LinkedBlockingDeque emitQueue;
   protected transient PreparedStatement ps;
   protected WindowDataManager windowManager;
--- End diff --

It would be better if you could reorder transients / non-transients into 
separate categories. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-03 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73319137
  
--- Diff: library/pom.xml ---
@@ -335,6 +335,12 @@
   7.0.6
 
 
+  org.jooq
+  jooq
+  3.6.4
+
+
--- End diff --

don't add extra lines


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-03 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73319381
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -51,22 +64,22 @@
 import com.datatorrent.lib.util.KryoCloneUtils;
 import com.datatorrent.netlet.util.DTThrowable;
 
+import static java.sql.ResultSet.CONCUR_READ_ONLY;
+import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
+import static org.jooq.impl.DSL.field;
+
 /**
  * Abstract operator for for consuming data using JDBC interface
- * User needs User needs to provide
- * tableName,dbConnection,setEmitColumnList,look-up key 
- * Optionally batchSize,pollInterval,Look-up key and a where clause can be 
given
- * 
+ * User needs to provide tableName, dbConnection, columnsExpression, 
look-up key
+ * Optionally batchSize, pollInterval and a where clause can be given 
  * This operator uses static partitioning to arrive at range queries for 
exactly
  * once reads
  * This operator will create a configured number of non-polling static
  * partitions for fetching the existing data in the table. And an 
additional
  * single partition for polling additive data. Assumption is that there is 
an
  * ordered column using which range queries can be formed
- * If an emitColumnList is provided, please ensure that the keyColumn is 
the
+ * If an columnsExpression is provided, please ensure that the keyColumn 
is the
--- End diff --

If this is a list, better to call it a list, and not an expression


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-03 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73319725
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +93,48 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, IdleTimeHandler, 
Partitioner>
 {
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 2;
   /**
* poll interval in milliseconds
*/
-  private static int pollInterval = 1;
+  private int pollInterval = DEFAULT_POLL_INTERVAL;
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
   protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
+  protected boolean isPollerPartition;
--- End diff --

If this is required, please mark it ```@NotNull``` or else, provide a 
default value


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-03 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73319799
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +93,48 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, IdleTimeHandler, 
Partitioner>
 {
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 2;
   /**
* poll interval in milliseconds
*/
-  private static int pollInterval = 1;
+  private int pollInterval = DEFAULT_POLL_INTERVAL;
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
   protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
+  protected boolean isPollerPartition;
   protected int batchSize;
--- End diff --

```NotNull```?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-03 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73319897
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +93,48 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, IdleTimeHandler, 
Partitioner>
 {
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 2;
   /**
* poll interval in milliseconds
*/
-  private static int pollInterval = 1;
+  private int pollInterval = DEFAULT_POLL_INTERVAL;
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
   protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
+  protected boolean isPollerPartition;
   protected int batchSize;
-  protected static int fetchSize = 2;
+
   /**
* Map of windowId to  of the range key
*/
-  protected transient MutablePair 
currentWindowRecoveryState;
+  protected transient MutablePair 
currentWindowRecoveryState;
+  private transient DSLContext create;
 
-  /**
-   * size of the emit queue used to hold polled records before emit
-   */
-  private static int queueCapacity = 4 * 1024 * 1024;
   private transient volatile boolean execute;
   private transient AtomicReference cause;
+  private String tableName;
+  private String columnsExpression;
+  private String whereCondition = null;
+  private String key;
--- End diff --

```@NotNull``` for tableName, columnsExpression and key?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-03 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73319997
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +93,48 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, IdleTimeHandler, 
Partitioner>
 {
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 2;
   /**
* poll interval in milliseconds
*/
-  private static int pollInterval = 1;
+  private int pollInterval = DEFAULT_POLL_INTERVAL;
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
   protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
+  protected boolean isPollerPartition;
   protected int batchSize;
-  protected static int fetchSize = 2;
+
   /**
* Map of windowId to  of the range key
*/
-  protected transient MutablePair 
currentWindowRecoveryState;
+  protected transient MutablePair 
currentWindowRecoveryState;
+  private transient DSLContext create;
 
-  /**
-   * size of the emit queue used to hold polled records before emit
-   */
-  private static int queueCapacity = 4 * 1024 * 1024;
   private transient volatile boolean execute;
   private transient AtomicReference cause;
+  private String tableName;
+  private String columnsExpression;
--- End diff --

Can it be renamed to something like ```columnsListStr``` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-03 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73320448
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +93,48 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, IdleTimeHandler, 
Partitioner>
 {
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 2;
   /**
* poll interval in milliseconds
*/
-  private static int pollInterval = 1;
+  private int pollInterval = DEFAULT_POLL_INTERVAL;
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
   protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
+  protected boolean isPollerPartition;
   protected int batchSize;
-  protected static int fetchSize = 2;
+
   /**
* Map of windowId to  of the range key
*/
-  protected transient MutablePair 
currentWindowRecoveryState;
+  protected transient MutablePair 
currentWindowRecoveryState;
+  private transient DSLContext create;
 
-  /**
-   * size of the emit queue used to hold polled records before emit
-   */
-  private static int queueCapacity = 4 * 1024 * 1024;
   private transient volatile boolean execute;
   private transient AtomicReference cause;
+  private String tableName;
+  private String columnsExpression;
+  private String whereCondition = null;
+  private String key;
   protected transient int spinMillis;
-  private transient OperatorContext context;
-  protected String tableName;
-  protected String key;
   protected long currentWindowId;
-  protected KeyValPair rangeQueryPair;
-  protected String lower;
-  protected String upper;
-  protected boolean recovered;
-  protected boolean isPolled;
-  protected String whereCondition = null;
-  protected String previousUpperBound;
-  protected String highestPolled;
-  private static final String user = "";
-  private static final String password = "";
-  /**
-   * thread to poll database
-   */
-  private transient Thread dbPoller;
-  protected transient ArrayBlockingQueue> emitQueue;
+  protected KeyValPair rangeQueryPair;
+  protected Integer lower;
--- End diff --

Better name? ```lowerBound```?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-03 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73320816
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +93,48 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, IdleTimeHandler, 
Partitioner>
 {
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 2;
   /**
* poll interval in milliseconds
*/
-  private static int pollInterval = 1;
+  private int pollInterval = DEFAULT_POLL_INTERVAL;
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
   protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
+  protected boolean isPollerPartition;
   protected int batchSize;
-  protected static int fetchSize = 2;
+
   /**
* Map of windowId to  of the range key
*/
-  protected transient MutablePair 
currentWindowRecoveryState;
+  protected transient MutablePair 
currentWindowRecoveryState;
+  private transient DSLContext create;
 
-  /**
-   * size of the emit queue used to hold polled records before emit
-   */
-  private static int queueCapacity = 4 * 1024 * 1024;
   private transient volatile boolean execute;
   private transient AtomicReference cause;
+  private String tableName;
+  private String columnsExpression;
+  private String whereCondition = null;
+  private String key;
   protected transient int spinMillis;
-  private transient OperatorContext context;
-  protected String tableName;
-  protected String key;
   protected long currentWindowId;
-  protected KeyValPair rangeQueryPair;
-  protected String lower;
-  protected String upper;
-  protected boolean recovered;
-  protected boolean isPolled;
-  protected String whereCondition = null;
-  protected String previousUpperBound;
-  protected String highestPolled;
-  private static final String user = "";
-  private static final String password = "";
-  /**
-   * thread to poll database
-   */
-  private transient Thread dbPoller;
-  protected transient ArrayBlockingQueue> emitQueue;
+  protected KeyValPair rangeQueryPair;
+  protected Integer lower;
+  protected transient boolean isPolled;
+  protected transient Integer lastPolledBound;
--- End diff --

can this actually be called ```upperBound```?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-03 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73321098
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +93,48 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, IdleTimeHandler, 
Partitioner>
 {
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 2;
   /**
* poll interval in milliseconds
*/
-  private static int pollInterval = 1;
+  private int pollInterval = DEFAULT_POLL_INTERVAL;
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
   protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
+  protected boolean isPollerPartition;
   protected int batchSize;
-  protected static int fetchSize = 2;
+
   /**
* Map of windowId to  of the range key
*/
-  protected transient MutablePair 
currentWindowRecoveryState;
+  protected transient MutablePair 
currentWindowRecoveryState;
+  private transient DSLContext create;
 
-  /**
-   * size of the emit queue used to hold polled records before emit
-   */
-  private static int queueCapacity = 4 * 1024 * 1024;
   private transient volatile boolean execute;
   private transient AtomicReference cause;
+  private String tableName;
+  private String columnsExpression;
+  private String whereCondition = null;
+  private String key;
   protected transient int spinMillis;
-  private transient OperatorContext context;
-  protected String tableName;
-  protected String key;
   protected long currentWindowId;
-  protected KeyValPair rangeQueryPair;
-  protected String lower;
-  protected String upper;
-  protected boolean recovered;
-  protected boolean isPolled;
-  protected String whereCondition = null;
-  protected String previousUpperBound;
-  protected String highestPolled;
-  private static final String user = "";
-  private static final String password = "";
-  /**
-   * thread to poll database
-   */
-  private transient Thread dbPoller;
-  protected transient ArrayBlockingQueue> emitQueue;
+  protected KeyValPair rangeQueryPair;
+  protected Integer lower;
+  protected transient boolean isPolled;
+  protected transient Integer lastPolledBound;
+  protected transient Integer lastEmittedRecord;
+  protected transient ScheduledExecutorService scanService;
+  protected transient LinkedBlockingDeque emitQueue;
   protected transient PreparedStatement ps;
   protected WindowDataManager windowManager;
--- End diff --

Perhaps a lot of the above variables can be private. Can you check which 
ones can be made private?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-03 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73326138
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -286,149 +146,119 @@ public AbstractJdbcPollInputOperator()
   public void setup(OperatorContext context)
   {
 super.setup(context);
+intializeDSLContext();
+if (scanService == null) {
+  scanService = Executors.newScheduledThreadPool(partitionCount);
--- End diff --

This means, if the number of partitions are set to more than one, then a 
threadpool with multiple threads will be created. I think we just need a single 
thread.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-03 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73326356
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +93,48 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, IdleTimeHandler, 
Partitioner>
--- End diff --

You can remove IdleTimeHandler and handleIdleTime() method. Nothing is 
being done there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-03 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73326407
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +93,48 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, IdleTimeHandler, 
Partitioner>
 {
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 2;
   /**
* poll interval in milliseconds
*/
-  private static int pollInterval = 1;
+  private int pollInterval = DEFAULT_POLL_INTERVAL;
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
   protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
+  protected boolean isPollerPartition;
   protected int batchSize;
-  protected static int fetchSize = 2;
+
   /**
* Map of windowId to  of the range key
*/
-  protected transient MutablePair 
currentWindowRecoveryState;
+  protected transient MutablePair 
currentWindowRecoveryState;
+  private transient DSLContext create;
 
-  /**
-   * size of the emit queue used to hold polled records before emit
-   */
-  private static int queueCapacity = 4 * 1024 * 1024;
   private transient volatile boolean execute;
   private transient AtomicReference cause;
+  private String tableName;
+  private String columnsExpression;
+  private String whereCondition = null;
+  private String key;
   protected transient int spinMillis;
--- End diff --

Not needed. This can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-03 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73326772
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -286,149 +146,119 @@ public AbstractJdbcPollInputOperator()
   public void setup(OperatorContext context)
   {
 super.setup(context);
+intializeDSLContext();
+if (scanService == null) {
+  scanService = Executors.newScheduledThreadPool(partitionCount);
+}
 spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
 execute = true;
 cause = new AtomicReference();
-emitQueue = new ArrayBlockingQueue>(queueCapacity);
-this.context = context;
+emitQueue = new LinkedBlockingDeque<>(queueCapacity);
 operatorId = context.getId();
+windowManager.setup(context);
+  }
 
-try {
+  private void intializeDSLContext()
+  {
+create = DSL.using(store.getConnection(), 
JDBCUtils.dialect(store.getDatabaseUrl()));
+  }
 
-  //If its a range query pass upper and lower bounds
-  //If its a polling query pass only the lower bound
-  if (getRangeQueryPair().getValue() != null) {
-ps = store.getConnection()
-.prepareStatement(
-JdbcMetaDataUtility.buildRangeQuery(getTableName(), 
getKey(), rangeQueryPair.getKey(),
-rangeQueryPair.getValue()),
-java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
+  @Override
+  public void activate(OperatorContext context)
+  {
+initializePreparedStatement();
+long largestRecoveryWindow = windowManager.getLargestRecoveryWindow();
+if (largestRecoveryWindow == Stateless.WINDOW_ID
+|| context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) 
> largestRecoveryWindow) {
+  scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, 
TimeUnit.MILLISECONDS);
+}
+  }
+
+  protected void initializePreparedStatement()
+  {
+try {
+  // If its a range query pass upper and lower bounds, If its a 
polling query pass only the lower bound
+  if (isPollerPartition) {
+ps = 
store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), 
Integer.MAX_VALUE),
--- End diff --

Are you expecting rangeQueryPair form the user? If so, mark it 
```@NotNull```. This is not being set anywhere..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-03 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r7333
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -286,149 +146,119 @@ public AbstractJdbcPollInputOperator()
   public void setup(OperatorContext context)
   {
 super.setup(context);
+intializeDSLContext();
+if (scanService == null) {
+  scanService = Executors.newScheduledThreadPool(partitionCount);
+}
 spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
 execute = true;
 cause = new AtomicReference();
-emitQueue = new ArrayBlockingQueue>(queueCapacity);
-this.context = context;
+emitQueue = new LinkedBlockingDeque<>(queueCapacity);
 operatorId = context.getId();
+windowManager.setup(context);
+  }
 
-try {
+  private void intializeDSLContext()
+  {
+create = DSL.using(store.getConnection(), 
JDBCUtils.dialect(store.getDatabaseUrl()));
+  }
 
-  //If its a range query pass upper and lower bounds
-  //If its a polling query pass only the lower bound
-  if (getRangeQueryPair().getValue() != null) {
-ps = store.getConnection()
-.prepareStatement(
-JdbcMetaDataUtility.buildRangeQuery(getTableName(), 
getKey(), rangeQueryPair.getKey(),
-rangeQueryPair.getValue()),
-java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
+  @Override
+  public void activate(OperatorContext context)
+  {
+initializePreparedStatement();
+long largestRecoveryWindow = windowManager.getLargestRecoveryWindow();
+if (largestRecoveryWindow == Stateless.WINDOW_ID
+|| context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) 
> largestRecoveryWindow) {
+  scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, 
TimeUnit.MILLISECONDS);
+}
+  }
+
+  protected void initializePreparedStatement()
+  {
+try {
+  // If its a range query pass upper and lower bounds, If its a 
polling query pass only the lower bound
+  if (isPollerPartition) {
+ps = 
store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), 
Integer.MAX_VALUE),
--- End diff --

```rangeQueryPair``` must be created some where in setup() for this 
particular partition


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-04 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73647507
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +93,48 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, IdleTimeHandler, 
Partitioner>
 {
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 2;
   /**
* poll interval in milliseconds
*/
-  private static int pollInterval = 1;
+  private int pollInterval = DEFAULT_POLL_INTERVAL;
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
   protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
+  protected boolean isPollerPartition;
--- End diff --

that will be calculated during "definePartition" phase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-04 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73647479
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -51,22 +64,22 @@
 import com.datatorrent.lib.util.KryoCloneUtils;
 import com.datatorrent.netlet.util.DTThrowable;
 
+import static java.sql.ResultSet.CONCUR_READ_ONLY;
+import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
+import static org.jooq.impl.DSL.field;
+
 /**
  * Abstract operator for for consuming data using JDBC interface
- * User needs User needs to provide
- * tableName,dbConnection,setEmitColumnList,look-up key 
- * Optionally batchSize,pollInterval,Look-up key and a where clause can be 
given
- * 
+ * User needs to provide tableName, dbConnection, columnsExpression, 
look-up key
+ * Optionally batchSize, pollInterval and a where clause can be given 
  * This operator uses static partitioning to arrive at range queries for 
exactly
  * once reads
  * This operator will create a configured number of non-polling static
  * partitions for fetching the existing data in the table. And an 
additional
  * single partition for polling additive data. Assumption is that there is 
an
  * ordered column using which range queries can be formed
- * If an emitColumnList is provided, please ensure that the keyColumn is 
the
+ * If an columnsExpression is provided, please ensure that the keyColumn 
is the
--- End diff --

just wanted to be consistent with JDBCPojoInputOperator which is already 
there in Malhar.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-04 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73647661
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +93,48 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, IdleTimeHandler, 
Partitioner>
 {
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 2;
   /**
* poll interval in milliseconds
*/
-  private static int pollInterval = 1;
+  private int pollInterval = DEFAULT_POLL_INTERVAL;
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
   protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
+  protected boolean isPollerPartition;
   protected int batchSize;
--- End diff --

batchSize is optional, let me add some default value to it. What's the 
suggestion? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-04 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73647746
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +93,48 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, IdleTimeHandler, 
Partitioner>
 {
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 2;
   /**
* poll interval in milliseconds
*/
-  private static int pollInterval = 1;
+  private int pollInterval = DEFAULT_POLL_INTERVAL;
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
   protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
+  protected boolean isPollerPartition;
   protected int batchSize;
-  protected static int fetchSize = 2;
+
   /**
* Map of windowId to  of the range key
*/
-  protected transient MutablePair 
currentWindowRecoveryState;
+  protected transient MutablePair 
currentWindowRecoveryState;
+  private transient DSLContext create;
 
-  /**
-   * size of the emit queue used to hold polled records before emit
-   */
-  private static int queueCapacity = 4 * 1024 * 1024;
   private transient volatile boolean execute;
   private transient AtomicReference cause;
+  private String tableName;
+  private String columnsExpression;
+  private String whereCondition = null;
+  private String key;
--- End diff --

done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-04 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73647799
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +93,48 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, IdleTimeHandler, 
Partitioner>
 {
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 2;
   /**
* poll interval in milliseconds
*/
-  private static int pollInterval = 1;
+  private int pollInterval = DEFAULT_POLL_INTERVAL;
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
   protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
+  protected boolean isPollerPartition;
   protected int batchSize;
-  protected static int fetchSize = 2;
+
   /**
* Map of windowId to  of the range key
*/
-  protected transient MutablePair 
currentWindowRecoveryState;
+  protected transient MutablePair 
currentWindowRecoveryState;
+  private transient DSLContext create;
 
-  /**
-   * size of the emit queue used to hold polled records before emit
-   */
-  private static int queueCapacity = 4 * 1024 * 1024;
   private transient volatile boolean execute;
   private transient AtomicReference cause;
+  private String tableName;
+  private String columnsExpression;
--- End diff --

should we be consistent with already present JDBCPojoInputOperator? People 
can use same set of properties with both operators.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-04 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73647809
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +93,48 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, IdleTimeHandler, 
Partitioner>
 {
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 2;
   /**
* poll interval in milliseconds
*/
-  private static int pollInterval = 1;
+  private int pollInterval = DEFAULT_POLL_INTERVAL;
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
   protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
+  protected boolean isPollerPartition;
   protected int batchSize;
-  protected static int fetchSize = 2;
+
   /**
* Map of windowId to  of the range key
*/
-  protected transient MutablePair 
currentWindowRecoveryState;
+  protected transient MutablePair 
currentWindowRecoveryState;
+  private transient DSLContext create;
 
-  /**
-   * size of the emit queue used to hold polled records before emit
-   */
-  private static int queueCapacity = 4 * 1024 * 1024;
   private transient volatile boolean execute;
   private transient AtomicReference cause;
+  private String tableName;
+  private String columnsExpression;
+  private String whereCondition = null;
+  private String key;
   protected transient int spinMillis;
-  private transient OperatorContext context;
-  protected String tableName;
-  protected String key;
   protected long currentWindowId;
-  protected KeyValPair rangeQueryPair;
-  protected String lower;
-  protected String upper;
-  protected boolean recovered;
-  protected boolean isPolled;
-  protected String whereCondition = null;
-  protected String previousUpperBound;
-  protected String highestPolled;
-  private static final String user = "";
-  private static final String password = "";
-  /**
-   * thread to poll database
-   */
-  private transient Thread dbPoller;
-  protected transient ArrayBlockingQueue> emitQueue;
+  protected KeyValPair rangeQueryPair;
+  protected Integer lower;
--- End diff --

yes :)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-04 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73648038
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -286,149 +146,119 @@ public AbstractJdbcPollInputOperator()
   public void setup(OperatorContext context)
   {
 super.setup(context);
+intializeDSLContext();
+if (scanService == null) {
+  scanService = Executors.newScheduledThreadPool(partitionCount);
--- End diff --

Right, made it "1"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-04 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73648086
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +93,48 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, IdleTimeHandler, 
Partitioner>
--- End diff --

okay, that was already there, will remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-04 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73648435
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -286,149 +146,119 @@ public AbstractJdbcPollInputOperator()
   public void setup(OperatorContext context)
   {
 super.setup(context);
+intializeDSLContext();
+if (scanService == null) {
+  scanService = Executors.newScheduledThreadPool(partitionCount);
+}
 spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
 execute = true;
 cause = new AtomicReference();
-emitQueue = new ArrayBlockingQueue>(queueCapacity);
-this.context = context;
+emitQueue = new LinkedBlockingDeque<>(queueCapacity);
 operatorId = context.getId();
+windowManager.setup(context);
+  }
 
-try {
+  private void intializeDSLContext()
+  {
+create = DSL.using(store.getConnection(), 
JDBCUtils.dialect(store.getDatabaseUrl()));
+  }
 
-  //If its a range query pass upper and lower bounds
-  //If its a polling query pass only the lower bound
-  if (getRangeQueryPair().getValue() != null) {
-ps = store.getConnection()
-.prepareStatement(
-JdbcMetaDataUtility.buildRangeQuery(getTableName(), 
getKey(), rangeQueryPair.getKey(),
-rangeQueryPair.getValue()),
-java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
+  @Override
+  public void activate(OperatorContext context)
+  {
+initializePreparedStatement();
+long largestRecoveryWindow = windowManager.getLargestRecoveryWindow();
+if (largestRecoveryWindow == Stateless.WINDOW_ID
+|| context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) 
> largestRecoveryWindow) {
+  scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, 
TimeUnit.MILLISECONDS);
+}
+  }
+
+  protected void initializePreparedStatement()
+  {
+try {
+  // If its a range query pass upper and lower bounds, If its a 
polling query pass only the lower bound
+  if (isPollerPartition) {
+ps = 
store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), 
Integer.MAX_VALUE),
--- End diff --

rangeQueryPair is calculated during "definePartition" and use is not 
suppose to set it. Let me try to make that part of code more readable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-07 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73823756
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +91,45 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, 
Partitioner>
 {
-  /**
-   * poll interval in milliseconds
-   */
-  private static int pollInterval = 1;
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 2;
+  private static int DEFAULT_BATCH_SIZE = 2000;
+  private int pollInterval = DEFAULT_POLL_INTERVAL; //in miliseconds
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
-  protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
-  protected int batchSize;
-  protected static int fetchSize = 2;
-  /**
-   * Map of windowId to  of the range key
-   */
-  protected transient MutablePair 
currentWindowRecoveryState;
-
-  /**
-   * size of the emit queue used to hold polled records before emit
-   */
-  private static int queueCapacity = 4 * 1024 * 1024;
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  @NotNull
+  private String tableName;
+  @NotNull
+  private String columnsExpression;
+  @NotNull
+  private String key;
+  private String whereCondition = null;
+  private long currentWindowId;
+  private WindowDataManager windowManager;
+
+  protected KeyValPair rangeQueryPair;
+  protected Integer lowerBound;
+  private transient int operatorId;
+  private transient DSLContext create;
   private transient volatile boolean execute;
-  private transient AtomicReference cause;
-  protected transient int spinMillis;
-  private transient OperatorContext context;
-  protected String tableName;
-  protected String key;
-  protected long currentWindowId;
-  protected KeyValPair rangeQueryPair;
-  protected String lower;
-  protected String upper;
-  protected boolean recovered;
-  protected boolean isPolled;
-  protected String whereCondition = null;
-  protected String previousUpperBound;
-  protected String highestPolled;
-  private static final String user = "";
-  private static final String password = "";
-  /**
-   * thread to poll database
-   */
-  private transient Thread dbPoller;
-  protected transient ArrayBlockingQueue> emitQueue;
+  private transient ScheduledExecutorService scanService;
+  protected transient boolean isPolled;
+  protected transient Integer lastPolledBound;
+  protected transient Integer lastEmittedRecord;
--- End diff --

Can you make ```lowerBound```, ```lastPolledBound``` and 
```lastEmittedRecord``` as ```int``` instead of ```Integer```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-07 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73824180
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +91,45 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, 
Partitioner>
 {
-  /**
-   * poll interval in milliseconds
-   */
-  private static int pollInterval = 1;
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
--- End diff --

Perhaps this seems too large for the queue capacity? It means even for a 
100 byte record, ~ 400MB of data will be in the queue!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-07 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73826686
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -565,67 +379,110 @@ public void emitReplayedTuples(PreparedStatement ps)
*/
   @Override
   public 
Collection>>
 definePartitions(
-  
Collection>>
 partitions,
-  com.datatorrent.api.Partitioner.PartitioningContext context)
+  Collection>> partitions, 
PartitioningContext context)
   {
 List>> newPartitions = new 
ArrayList>>(
 getPartitionCount());
-JdbcStore jdbcStore = new JdbcStore();
-jdbcStore.setDatabaseDriver(store.getDatabaseDriver());
-jdbcStore.setDatabaseUrl(store.getDatabaseUrl());
-jdbcStore.setConnectionProperties(store.getConnectionProperties());
 
-jdbcStore.connect();
-
-HashMap> partitionToRangeMap = 
null;
+HashMap> partitionToRangeMap = 
null;
 try {
-  partitionToRangeMap = 
JdbcMetaDataUtility.getPartitionedQueryMap(getPartitionCount(),
-  jdbcStore.getDatabaseDriver(), jdbcStore.getDatabaseUrl(), 
getTableName(), getKey(),
-  store.getConnectionProperties().getProperty(user), 
store.getConnectionProperties().getProperty(password),
-  whereCondition, emitColumnList);
+  store.connect();
+  intializeDSLContext();
+  partitionToRangeMap = 
getPartitionedQueryRangeMap(getPartitionCount());
 } catch (SQLException e) {
   LOG.error("Exception in initializing the partition range", e);
+  throw new RuntimeException(e);
+} finally {
+  store.disconnect();
 }
 
 KryoCloneUtils> cloneUtils = 
KryoCloneUtils.createCloneUtils(this);
 
+// The n given partitions are for range queries and n + 1 partition is 
for polling query
 for (int i = 0; i <= getPartitionCount(); i++) {
-  AbstractJdbcPollInputOperator jdbcPoller = null;
-
-  jdbcPoller = cloneUtils.getClone();
-
-  jdbcPoller.setStore(store);
-  jdbcPoller.setKey(getKey());
-  jdbcPoller.setPartitionCount(getPartitionCount());
-  jdbcPoller.setPollInterval(getPollInterval());
-  jdbcPoller.setTableName(getTableName());
-  jdbcPoller.setBatchSize(getBatchSize());
-  jdbcPoller.setEmitColumnList(getEmitColumnList());
-
-  store.connect();
-  //The n given partitions are for range queries and n + 1 partition 
is for polling query
-  //The upper bound for the n+1 partition is set to null since its a 
pollable partition
+  AbstractJdbcPollInputOperator jdbcPoller = cloneUtils.getClone();
   if (i < getPartitionCount()) {
-jdbcPoller.setRangeQueryPair(partitionToRangeMap.get(i));
-isPollable = false;
+jdbcPoller.rangeQueryPair = partitionToRangeMap.get(i);
+jdbcPoller.lastEmittedRecord = partitionToRangeMap.get(i).getKey();
--- End diff --

Set ```jdbcPoller.isPollerPartition = false``` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-07 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73828043
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -286,149 +141,117 @@ public AbstractJdbcPollInputOperator()
   public void setup(OperatorContext context)
   {
 super.setup(context);
-spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+intializeDSLContext();
+if (scanService == null) {
+  scanService = Executors.newScheduledThreadPool(1);
+}
 execute = true;
-cause = new AtomicReference();
-emitQueue = new ArrayBlockingQueue>(queueCapacity);
-this.context = context;
+emitQueue = new LinkedBlockingDeque<>(queueCapacity);
 operatorId = context.getId();
+windowManager.setup(context);
+  }
 
-try {
+  private void intializeDSLContext()
+  {
+create = DSL.using(store.getConnection(), 
JDBCUtils.dialect(store.getDatabaseUrl()));
+  }
 
-  //If its a range query pass upper and lower bounds
-  //If its a polling query pass only the lower bound
-  if (getRangeQueryPair().getValue() != null) {
-ps = store.getConnection()
-.prepareStatement(
-JdbcMetaDataUtility.buildRangeQuery(getTableName(), 
getKey(), rangeQueryPair.getKey(),
-rangeQueryPair.getValue()),
-java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
+  @Override
+  public void activate(OperatorContext context)
+  {
+initializePreparedStatement();
+long largestRecoveryWindow = windowManager.getLargestRecoveryWindow();
+if (largestRecoveryWindow == Stateless.WINDOW_ID
+|| context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) 
> largestRecoveryWindow) {
+  scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, 
TimeUnit.MILLISECONDS);
+}
+  }
+
+  protected void initializePreparedStatement()
+  {
+try {
+  // If its a range query pass upper and lower bounds, If its a 
polling query pass only the lower bound
+  if (isPollerPartition) {
+ps = 
store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), 
Integer.MAX_VALUE),
+TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
   } else {
 ps = store.getConnection().prepareStatement(
-JdbcMetaDataUtility.buildPollableQuery(getTableName(), 
getKey(), rangeQueryPair.getKey()),
-java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
-isPollable = true;
+buildRangeQuery(rangeQueryPair.getKey(), 
(rangeQueryPair.getValue() - rangeQueryPair.getKey())),
+TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
   }
-
 } catch (SQLException e) {
   LOG.error("Exception in initializing the range query for a given 
partition", e);
   throw new RuntimeException(e);
 }
 
-windowManager.setup(context);
-LOG.debug("super setup done...");
   }
 
   @Override
   public void beginWindow(long windowId)
   {
 currentWindowId = windowId;
-
-isReplayed = false;
-
 if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
   try {
 replay(currentWindowId);
+return;
   } catch (SQLException e) {
 LOG.error("Exception in replayed windows", e);
 throw new RuntimeException(e);
   }
 }
-
-if (isReplayed && currentWindowId == 
windowManager.getLargestRecoveryWindow()) {
-  try {
-if (!isPollable && rangeQueryPair.getValue() != null) {
-
-  ps = store.getConnection().prepareStatement(
-  JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), 
getKey(), previousUpperBound,
-  rangeQueryPair.getValue()),
-  java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
-} else {
-  String bound = null;
-  if (previousUpperBound == null) {
-bound = getRangeQueryPair().getKey();
-  } else {
-bound = previousUpperBound;
-  }
-  ps = store.getConnection().prepareStatement(
-  JdbcMetaDataUtility.buildPollableQuery(getTableName(), 
getKey(), bound),
-  java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
-  isPollable = true;
-}
-isReplayed = false;
-LOG.debug("Prepared statement after re-initialization - {} ", 
ps.toString());
-  } catch (SQLException e) {
-  

[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-07 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73828487
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +91,45 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, 
Partitioner>
 {
-  /**
-   * poll interval in milliseconds
-   */
-  private static int pollInterval = 1;
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 2;
+  private static int DEFAULT_BATCH_SIZE = 2000;
+  private int pollInterval = DEFAULT_POLL_INTERVAL; //in miliseconds
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
-  protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
-  protected int batchSize;
-  protected static int fetchSize = 2;
-  /**
-   * Map of windowId to  of the range key
-   */
-  protected transient MutablePair 
currentWindowRecoveryState;
-
-  /**
-   * size of the emit queue used to hold polled records before emit
-   */
-  private static int queueCapacity = 4 * 1024 * 1024;
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  @NotNull
+  private String tableName;
+  @NotNull
+  private String columnsExpression;
+  @NotNull
+  private String key;
+  private String whereCondition = null;
+  private long currentWindowId;
+  private WindowDataManager windowManager;
+
+  protected KeyValPair rangeQueryPair;
+  protected Integer lowerBound;
+  private transient int operatorId;
+  private transient DSLContext create;
   private transient volatile boolean execute;
-  private transient AtomicReference cause;
-  protected transient int spinMillis;
-  private transient OperatorContext context;
-  protected String tableName;
-  protected String key;
-  protected long currentWindowId;
-  protected KeyValPair rangeQueryPair;
-  protected String lower;
-  protected String upper;
-  protected boolean recovered;
-  protected boolean isPolled;
-  protected String whereCondition = null;
-  protected String previousUpperBound;
-  protected String highestPolled;
-  private static final String user = "";
-  private static final String password = "";
-  /**
-   * thread to poll database
-   */
-  private transient Thread dbPoller;
-  protected transient ArrayBlockingQueue> emitQueue;
+  private transient ScheduledExecutorService scanService;
+  protected transient boolean isPolled;
+  protected transient Integer lastPolledBound;
--- End diff --

```upperBound```? Seems to be more intuitive when looking at the rest of 
the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73830019
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
+} while (result.next());
   }
-} catch (InterruptedException ex) {
-  // log and ignore, ending execution anyway
-  LOG.error("exception in poller thread: ", ex);
+  isPolled = true;
+} catch (SQLException ex) {
+  throw new RuntimeException(String.format("Error while running 
query"), ex);
+} finally {
+  store.disconnect();
 }
-  }
 
-  @Override
-  public void handleIdleTime()
-  {
-if (execute) {
-  try {
-Thread.sleep(spinMillis);
-  } catch (InterruptedException ie) {
-throw new RuntimeException(ie);
-  }
-} else {
-  LOG.error("Exception: ", cause);
-  DTThrowable.rethrow(cause.get());
-}
   }
 
+  public abstract T getTuple(ResultSet result);
+
   protected void replay(long windowId) throws SQLException
   {
-isReplayed = true;
 
-MutablePair recoveredData = new MutablePair();
 try {
-  recoveredData = (MutablePair)windowManager.load(operatorId, windowId);
+  MutablePair recoveredData = (MutablePair)windowManager.load(operatorId,
+  windowId);
 
-  if (recoveredData != null) {
-//skip the window and return if there was no incoming data in the 
window
-if (recoveredData.left == null || recoveredData.right == null) {
-  return;
-}
-
-if (recoveredData.right.equals(rangeQueryPair.getValue()) || 
recoveredData.right.equals(previousUpperBound)) {
-  LOG.info("Matched so returning");
-  return;
-}
+  if (recoveredData != null && shouldReplayWindow(recoveredData)) {
+LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", 
windowId, recoveredData.left,
+recoveredData.right);
 
-JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
-jdbcPoller.setStore(store);
-jdbcPoller.setKey(getKey());
-jdbcPoller.setPartitionCount(getPartitionCount());
-jdbcPoller.setPollInterval(getPollInterval());
-jdbcPoller.setTableName(getTableName());
-jdbcPoller.setBatchSize(getBatchSize());
-isPollable = false;
-
-LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + 
"," + recoveredData.right + "]");
+ps = store.getConnection().prepareStatement(
+buildRangeQuery(rangeQueryPair.getKey(), 
(rangeQueryPair.getValue() - rangeQueryPair.getKey())),
--- End diff --

Should this fetch data from ```recoveredData.left``` to 
```recoveredData.right```?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73838062
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +91,45 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
--- End diff --

exactlyonce => idempotent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73838593
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
+} while (result.next());
   }
-} catch (InterruptedException ex) {
-  // log and ignore, ending execution anyway
-  LOG.error("exception in poller thread: ", ex);
+  isPolled = true;
+} catch (SQLException ex) {
+  throw new RuntimeException(String.format("Error while running 
query"), ex);
+} finally {
+  store.disconnect();
 }
-  }
 
-  @Override
-  public void handleIdleTime()
-  {
-if (execute) {
-  try {
-Thread.sleep(spinMillis);
-  } catch (InterruptedException ie) {
-throw new RuntimeException(ie);
-  }
-} else {
-  LOG.error("Exception: ", cause);
-  DTThrowable.rethrow(cause.get());
-}
   }
 
+  public abstract T getTuple(ResultSet result);
+
   protected void replay(long windowId) throws SQLException
   {
-isReplayed = true;
 
-MutablePair recoveredData = new MutablePair();
 try {
-  recoveredData = (MutablePair)windowManager.load(operatorId, windowId);
+  MutablePair recoveredData = (MutablePair)windowManager.load(operatorId,
+  windowId);
 
-  if (recoveredData != null) {
-//skip the window and return if there was no incoming data in the 
window
-if (recoveredData.left == null || recoveredData.right == null) {
-  return;
-}
-
-if (recoveredData.right.equals(rangeQueryPair.getValue()) || 
recoveredData.right.equals(previousUpperBound)) {
-  LOG.info("Matched so returning");
-  return;
-}
+  if (recoveredData != null && shouldReplayWindow(recoveredData)) {
+LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", 
windowId, recoveredData.left,
+recoveredData.right);
 
-JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
-jdbcPoller.setStore(store);
-jdbcPoller.setKey(getKey());
-jdbcPoller.setPartitionCount(getPartitionCount());
-jdbcPoller.setPollInterval(getPollInterval());
-jdbcPoller.setTableName(getTableName());
-jdbcPoller.setBatchSize(getBatchSize());
-isPollable = false;
-
-LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + 
"," + recoveredData.right + "]");
+ps = store.getConnection().prepareStatement(
+buildRangeQuery(rangeQueryPair.getKey(), 
(rangeQueryPair.getValue() - rangeQueryPair.getKey())),
+TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
+LOG.info("Query formed to recover data - {}", ps.toString());
 
-jdbcPoller.setRangeQueryPair(new KeyValPair(recoveredData.left, recoveredData.right));
+emitReplayedTuples(ps);
 
-jdbcPoller.ps = jdbcPoller.store.getConnection().prepareStatement(
-JdbcMetaDataUtility.buildRangeQuery(jdbcPoller.getTableName(), 
jdbcPoller.getKey(),
-jdbcPoller.getRangeQueryPair().getKey(), 
jdbcPoller.getRangeQueryPair().getValue()),
-java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
-LOG.info("Query formed for recovered data - {}", 
jdbcPoller.ps.toString());
+  }
 
-emi

[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73840151
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
+} while (result.next());
   }
-} catch (InterruptedException ex) {
-  // log and ignore, ending execution anyway
-  LOG.error("exception in poller thread: ", ex);
+  isPolled = true;
--- End diff --

Does this mean that we want to poll records just once in a window?
Is this because the poll interval is sufficiently long so as to prevent 
multiple calls to ```pollRecords()``` within a single window?
Even in that case, if the poll interval is reduced, there might be multiple 
calls where only one call will happen in this case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73840984
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
+} while (result.next());
   }
-} catch (InterruptedException ex) {
-  // log and ignore, ending execution anyway
-  LOG.error("exception in poller thread: ", ex);
+  isPolled = true;
+} catch (SQLException ex) {
+  throw new RuntimeException(String.format("Error while running 
query"), ex);
+} finally {
+  store.disconnect();
--- End diff --

This will be called even in case of no exceptions and create problems.
We should let the exception reach the operator thread. Otherwise this might 
never get lost in the thread.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73841049
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -286,149 +141,117 @@ public AbstractJdbcPollInputOperator()
   public void setup(OperatorContext context)
   {
 super.setup(context);
-spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+intializeDSLContext();
+if (scanService == null) {
+  scanService = Executors.newScheduledThreadPool(1);
+}
 execute = true;
-cause = new AtomicReference();
-emitQueue = new ArrayBlockingQueue>(queueCapacity);
-this.context = context;
+emitQueue = new LinkedBlockingDeque<>(queueCapacity);
 operatorId = context.getId();
+windowManager.setup(context);
+  }
 
-try {
+  private void intializeDSLContext()
+  {
+create = DSL.using(store.getConnection(), 
JDBCUtils.dialect(store.getDatabaseUrl()));
+  }
 
-  //If its a range query pass upper and lower bounds
-  //If its a polling query pass only the lower bound
-  if (getRangeQueryPair().getValue() != null) {
-ps = store.getConnection()
-.prepareStatement(
-JdbcMetaDataUtility.buildRangeQuery(getTableName(), 
getKey(), rangeQueryPair.getKey(),
-rangeQueryPair.getValue()),
-java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
+  @Override
+  public void activate(OperatorContext context)
+  {
+initializePreparedStatement();
+long largestRecoveryWindow = windowManager.getLargestRecoveryWindow();
+if (largestRecoveryWindow == Stateless.WINDOW_ID
+|| context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) 
> largestRecoveryWindow) {
+  scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, 
TimeUnit.MILLISECONDS);
+}
+  }
+
+  protected void initializePreparedStatement()
+  {
+try {
+  // If its a range query pass upper and lower bounds, If its a 
polling query pass only the lower bound
+  if (isPollerPartition) {
+ps = 
store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), 
Integer.MAX_VALUE),
+TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
   } else {
 ps = store.getConnection().prepareStatement(
-JdbcMetaDataUtility.buildPollableQuery(getTableName(), 
getKey(), rangeQueryPair.getKey()),
-java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
-isPollable = true;
+buildRangeQuery(rangeQueryPair.getKey(), 
(rangeQueryPair.getValue() - rangeQueryPair.getKey())),
+TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
   }
-
 } catch (SQLException e) {
   LOG.error("Exception in initializing the range query for a given 
partition", e);
   throw new RuntimeException(e);
 }
 
-windowManager.setup(context);
-LOG.debug("super setup done...");
   }
 
   @Override
   public void beginWindow(long windowId)
   {
 currentWindowId = windowId;
-
-isReplayed = false;
-
 if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
   try {
 replay(currentWindowId);
+return;
   } catch (SQLException e) {
 LOG.error("Exception in replayed windows", e);
 throw new RuntimeException(e);
   }
 }
-
-if (isReplayed && currentWindowId == 
windowManager.getLargestRecoveryWindow()) {
-  try {
-if (!isPollable && rangeQueryPair.getValue() != null) {
-
-  ps = store.getConnection().prepareStatement(
-  JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), 
getKey(), previousUpperBound,
-  rangeQueryPair.getValue()),
-  java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
-} else {
-  String bound = null;
-  if (previousUpperBound == null) {
-bound = getRangeQueryPair().getKey();
-  } else {
-bound = previousUpperBound;
-  }
-  ps = store.getConnection().prepareStatement(
-  JdbcMetaDataUtility.buildPollableQuery(getTableName(), 
getKey(), bound),
-  java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
-  isPollable = true;
-}
-isReplayed = false;
-LOG.debug("Prepared statement after re-initialization - {} ", 
ps.toString());
-  } catch (SQLException e) {
-

[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73841414
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
+} while (result.next());
   }
-} catch (InterruptedException ex) {
-  // log and ignore, ending execution anyway
-  LOG.error("exception in poller thread: ", ex);
+  isPolled = true;
+} catch (SQLException ex) {
+  throw new RuntimeException(String.format("Error while running 
query"), ex);
+} finally {
+  store.disconnect();
 }
-  }
 
-  @Override
-  public void handleIdleTime()
-  {
-if (execute) {
-  try {
-Thread.sleep(spinMillis);
-  } catch (InterruptedException ie) {
-throw new RuntimeException(ie);
-  }
-} else {
-  LOG.error("Exception: ", cause);
-  DTThrowable.rethrow(cause.get());
-}
   }
 
+  public abstract T getTuple(ResultSet result);
+
   protected void replay(long windowId) throws SQLException
   {
-isReplayed = true;
 
-MutablePair recoveredData = new MutablePair();
 try {
-  recoveredData = (MutablePair)windowManager.load(operatorId, windowId);
+  MutablePair recoveredData = (MutablePair)windowManager.load(operatorId,
+  windowId);
 
-  if (recoveredData != null) {
-//skip the window and return if there was no incoming data in the 
window
-if (recoveredData.left == null || recoveredData.right == null) {
-  return;
-}
-
-if (recoveredData.right.equals(rangeQueryPair.getValue()) || 
recoveredData.right.equals(previousUpperBound)) {
-  LOG.info("Matched so returning");
-  return;
-}
+  if (recoveredData != null && shouldReplayWindow(recoveredData)) {
+LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", 
windowId, recoveredData.left,
+recoveredData.right);
 
-JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
-jdbcPoller.setStore(store);
-jdbcPoller.setKey(getKey());
-jdbcPoller.setPartitionCount(getPartitionCount());
-jdbcPoller.setPollInterval(getPollInterval());
-jdbcPoller.setTableName(getTableName());
-jdbcPoller.setBatchSize(getBatchSize());
-isPollable = false;
-
-LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + 
"," + recoveredData.right + "]");
+ps = store.getConnection().prepareStatement(
+buildRangeQuery(rangeQueryPair.getKey(), 
(rangeQueryPair.getValue() - rangeQueryPair.getKey())),
--- End diff --

right, would update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73841763
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
+} while (result.next());
   }
-} catch (InterruptedException ex) {
-  // log and ignore, ending execution anyway
-  LOG.error("exception in poller thread: ", ex);
+  isPolled = true;
+} catch (SQLException ex) {
+  throw new RuntimeException(String.format("Error while running 
query"), ex);
+} finally {
+  store.disconnect();
 }
-  }
 
-  @Override
-  public void handleIdleTime()
-  {
-if (execute) {
-  try {
-Thread.sleep(spinMillis);
-  } catch (InterruptedException ie) {
-throw new RuntimeException(ie);
-  }
-} else {
-  LOG.error("Exception: ", cause);
-  DTThrowable.rethrow(cause.get());
-}
   }
 
+  public abstract T getTuple(ResultSet result);
+
   protected void replay(long windowId) throws SQLException
   {
-isReplayed = true;
 
-MutablePair recoveredData = new MutablePair();
 try {
-  recoveredData = (MutablePair)windowManager.load(operatorId, windowId);
+  MutablePair recoveredData = (MutablePair)windowManager.load(operatorId,
+  windowId);
 
-  if (recoveredData != null) {
-//skip the window and return if there was no incoming data in the 
window
-if (recoveredData.left == null || recoveredData.right == null) {
-  return;
-}
-
-if (recoveredData.right.equals(rangeQueryPair.getValue()) || 
recoveredData.right.equals(previousUpperBound)) {
-  LOG.info("Matched so returning");
-  return;
-}
+  if (recoveredData != null && shouldReplayWindow(recoveredData)) {
+LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", 
windowId, recoveredData.left,
+recoveredData.right);
 
-JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
-jdbcPoller.setStore(store);
-jdbcPoller.setKey(getKey());
-jdbcPoller.setPartitionCount(getPartitionCount());
-jdbcPoller.setPollInterval(getPollInterval());
-jdbcPoller.setTableName(getTableName());
-jdbcPoller.setBatchSize(getBatchSize());
-isPollable = false;
-
-LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + 
"," + recoveredData.right + "]");
+ps = store.getConnection().prepareStatement(
+buildRangeQuery(rangeQueryPair.getKey(), 
(rangeQueryPair.getValue() - rangeQueryPair.getKey())),
+TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
+LOG.info("Query formed to recover data - {}", ps.toString());
 
-jdbcPoller.setRangeQueryPair(new KeyValPair(recoveredData.left, recoveredData.right));
+emitReplayedTuples(ps);
 
-jdbcPoller.ps = jdbcPoller.store.getConnection().prepareStatement(
-JdbcMetaDataUtility.buildRangeQuery(jdbcPoller.getTableName(), 
jdbcPoller.getKey(),
-jdbcPoller.getRangeQueryPair().getKey(), 
jdbcPoller.getRangeQueryPair().getValue()),
-java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
-LOG.info("Query formed for recovered data - {}", 
jdbcPoller.ps.toString());
+  }
 
-emitR

[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73841819
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
--- End diff --

can be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73842885
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
--- End diff --

Use ```offer``` instead, else we might get an ```IllegalStateException```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73843244
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -286,149 +141,117 @@ public AbstractJdbcPollInputOperator()
   public void setup(OperatorContext context)
   {
 super.setup(context);
-spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+intializeDSLContext();
+if (scanService == null) {
+  scanService = Executors.newScheduledThreadPool(1);
+}
 execute = true;
-cause = new AtomicReference();
-emitQueue = new ArrayBlockingQueue>(queueCapacity);
-this.context = context;
+emitQueue = new LinkedBlockingDeque<>(queueCapacity);
 operatorId = context.getId();
+windowManager.setup(context);
+  }
 
-try {
+  private void intializeDSLContext()
+  {
+create = DSL.using(store.getConnection(), 
JDBCUtils.dialect(store.getDatabaseUrl()));
+  }
 
-  //If its a range query pass upper and lower bounds
-  //If its a polling query pass only the lower bound
-  if (getRangeQueryPair().getValue() != null) {
-ps = store.getConnection()
-.prepareStatement(
-JdbcMetaDataUtility.buildRangeQuery(getTableName(), 
getKey(), rangeQueryPair.getKey(),
-rangeQueryPair.getValue()),
-java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
+  @Override
+  public void activate(OperatorContext context)
+  {
+initializePreparedStatement();
+long largestRecoveryWindow = windowManager.getLargestRecoveryWindow();
+if (largestRecoveryWindow == Stateless.WINDOW_ID
+|| context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) 
> largestRecoveryWindow) {
+  scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, 
TimeUnit.MILLISECONDS);
+}
+  }
+
+  protected void initializePreparedStatement()
+  {
+try {
+  // If its a range query pass upper and lower bounds, If its a 
polling query pass only the lower bound
+  if (isPollerPartition) {
+ps = 
store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), 
Integer.MAX_VALUE),
+TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
   } else {
 ps = store.getConnection().prepareStatement(
-JdbcMetaDataUtility.buildPollableQuery(getTableName(), 
getKey(), rangeQueryPair.getKey()),
-java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
-isPollable = true;
+buildRangeQuery(rangeQueryPair.getKey(), 
(rangeQueryPair.getValue() - rangeQueryPair.getKey())),
+TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
   }
-
 } catch (SQLException e) {
   LOG.error("Exception in initializing the range query for a given 
partition", e);
   throw new RuntimeException(e);
 }
 
-windowManager.setup(context);
-LOG.debug("super setup done...");
   }
 
   @Override
   public void beginWindow(long windowId)
   {
 currentWindowId = windowId;
-
-isReplayed = false;
-
 if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
   try {
 replay(currentWindowId);
+return;
   } catch (SQLException e) {
 LOG.error("Exception in replayed windows", e);
 throw new RuntimeException(e);
   }
 }
-
-if (isReplayed && currentWindowId == 
windowManager.getLargestRecoveryWindow()) {
-  try {
-if (!isPollable && rangeQueryPair.getValue() != null) {
-
-  ps = store.getConnection().prepareStatement(
-  JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), 
getKey(), previousUpperBound,
-  rangeQueryPair.getValue()),
-  java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
-} else {
-  String bound = null;
-  if (previousUpperBound == null) {
-bound = getRangeQueryPair().getKey();
-  } else {
-bound = previousUpperBound;
-  }
-  ps = store.getConnection().prepareStatement(
-  JdbcMetaDataUtility.buildPollableQuery(getTableName(), 
getKey(), bound),
-  java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
-  isPollable = true;
-}
-isReplayed = false;
-LOG.debug("Prepared statement after re-initialization - {} ", 
ps.toString());
-  } catch (SQLException e) {
-  

[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73843385
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +91,45 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, 
Partitioner>
 {
-  /**
-   * poll interval in milliseconds
-   */
-  private static int pollInterval = 1;
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
--- End diff --

changing size to 4k


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73845347
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
+} while (result.next());
   }
-} catch (InterruptedException ex) {
-  // log and ignore, ending execution anyway
-  LOG.error("exception in poller thread: ", ex);
+  isPolled = true;
--- End diff --

The thread execution is not bound to window. And even if the pollInterval 
is short and there are multiple calls to poll record, as it's a single thread, 
the subsequent calls will block till this one is done. And the isPolled will be 
"true" for consequent calls of non-poller threads.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73848661
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
+} while (result.next());
   }
-} catch (InterruptedException ex) {
-  // log and ignore, ending execution anyway
-  LOG.error("exception in poller thread: ", ex);
+  isPolled = true;
+} catch (SQLException ex) {
+  throw new RuntimeException(String.format("Error while running 
query"), ex);
+} finally {
+  store.disconnect();
--- End diff --

calling disconnect for non-poller threads as they are not going to connect 
to db again, also udpated exception handling.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73848852
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
--- End diff --

removed this and couple of other lines :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73859472
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +91,45 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner>
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
--- End diff --

Can you mark this class with ```checkpointableWithinAppWindow = false```?
Better to have it in the abstract class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---