[ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15405749#comment-15405749 ]
ASF GitHub Bot commented on APEXMALHAR-2172: -------------------------------------------- Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73321098 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -80,201 +93,48 @@ * @tags database, sql, jdbc, partitionable,exactlyOnce */ @Evolving -public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T, JdbcStore> - implements ActivationListener<Context>, IdleTimeHandler, Partitioner<AbstractJdbcPollInputOperator<T>> +public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T, JdbcStore> implements + ActivationListener<OperatorContext>, IdleTimeHandler, Partitioner<AbstractJdbcPollInputOperator<T>> { + private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024; + private static int DEFAULT_POLL_INTERVAL = 10 * 1000; + private static int DEFAULT_FETCH_SIZE = 20000; /** * poll interval in milliseconds */ - private static int pollInterval = 10000; + private int pollInterval = DEFAULT_POLL_INTERVAL; + private int queueCapacity = DEFAULT_QUEUE_CAPACITY; + private int fetchSize = DEFAULT_FETCH_SIZE; @Min(1) private int partitionCount = 1; protected transient int operatorId; - protected transient boolean isReplayed; - protected transient boolean isPollable; + protected boolean isPollerPartition; protected int batchSize; - protected static int fetchSize = 20000; + /** * Map of windowId to <lower bound,upper bound> of the range key */ - protected transient MutablePair<String, String> currentWindowRecoveryState; + protected transient MutablePair<Integer, Integer> currentWindowRecoveryState; + private transient DSLContext create; - /** - * size of the emit queue used to hold polled records before emit - */ - private static int queueCapacity = 4 * 1024 * 1024; private transient volatile boolean execute; private transient AtomicReference<Throwable> cause; + private String tableName; + private String columnsExpression; + private String whereCondition = null; + private String key; protected transient int spinMillis; - private transient OperatorContext context; - protected String tableName; - protected String key; protected long currentWindowId; - protected KeyValPair<String, String> rangeQueryPair; - protected String lower; - protected String upper; - protected boolean recovered; - protected boolean isPolled; - protected String whereCondition = null; - protected String previousUpperBound; - protected String highestPolled; - private static final String user = ""; - private static final String password = ""; - /** - * thread to poll database - */ - private transient Thread dbPoller; - protected transient ArrayBlockingQueue<List<T>> emitQueue; + protected KeyValPair<Integer, Integer> rangeQueryPair; + protected Integer lower; + protected transient boolean isPolled; + protected transient Integer lastPolledBound; + protected transient Integer lastEmittedRecord; + protected transient ScheduledExecutorService scanService; + protected transient LinkedBlockingDeque<T> emitQueue; protected transient PreparedStatement ps; protected WindowDataManager windowManager; --- End diff -- Perhaps a lot of the above variables can be private. Can you check which ones can be made private? > Update JDBC poll input operator to fix issues > --------------------------------------------- > > Key: APEXMALHAR-2172 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172 > Project: Apache Apex Malhar > Issue Type: Improvement > Reporter: Priyanka Gugale > Assignee: Priyanka Gugale > > Update JDBCPollInputOperator to: > 1. Fix small bugs > 2. Use jooq query dsl library to construct sql queries > 3. Make code more readable > 4. Use row counts rather than key column values to partition reads -- This message was sent by Atlassian JIRA (v6.3.4#6332)