[
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411374#comment-15411374
]
ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------
Github user bhupeshchawda commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/358#discussion_r73826686
--- Diff:
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
---
@@ -565,67 +379,110 @@ public void emitReplayedTuples(PreparedStatement ps)
*/
@Override
public
Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>>
definePartitions(
-
Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>>
partitions,
- com.datatorrent.api.Partitioner.PartitioningContext context)
+ Collection<Partition<AbstractJdbcPollInputOperator<T>>> partitions,
PartitioningContext context)
{
List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions = new
ArrayList<Partition<AbstractJdbcPollInputOperator<T>>>(
getPartitionCount());
- JdbcStore jdbcStore = new JdbcStore();
- jdbcStore.setDatabaseDriver(store.getDatabaseDriver());
- jdbcStore.setDatabaseUrl(store.getDatabaseUrl());
- jdbcStore.setConnectionProperties(store.getConnectionProperties());
- jdbcStore.connect();
-
- HashMap<Integer, KeyValPair<String, String>> partitionToRangeMap =
null;
+ HashMap<Integer, KeyValPair<Integer, Integer>> partitionToRangeMap =
null;
try {
- partitionToRangeMap =
JdbcMetaDataUtility.getPartitionedQueryMap(getPartitionCount(),
- jdbcStore.getDatabaseDriver(), jdbcStore.getDatabaseUrl(),
getTableName(), getKey(),
- store.getConnectionProperties().getProperty(user),
store.getConnectionProperties().getProperty(password),
- whereCondition, emitColumnList);
+ store.connect();
+ intializeDSLContext();
+ partitionToRangeMap =
getPartitionedQueryRangeMap(getPartitionCount());
} catch (SQLException e) {
LOG.error("Exception in initializing the partition range", e);
+ throw new RuntimeException(e);
+ } finally {
+ store.disconnect();
}
KryoCloneUtils<AbstractJdbcPollInputOperator<T>> cloneUtils =
KryoCloneUtils.createCloneUtils(this);
+ // The n given partitions are for range queries and n + 1 partition is
for polling query
for (int i = 0; i <= getPartitionCount(); i++) {
- AbstractJdbcPollInputOperator<T> jdbcPoller = null;
-
- jdbcPoller = cloneUtils.getClone();
-
- jdbcPoller.setStore(store);
- jdbcPoller.setKey(getKey());
- jdbcPoller.setPartitionCount(getPartitionCount());
- jdbcPoller.setPollInterval(getPollInterval());
- jdbcPoller.setTableName(getTableName());
- jdbcPoller.setBatchSize(getBatchSize());
- jdbcPoller.setEmitColumnList(getEmitColumnList());
-
- store.connect();
- //The n given partitions are for range queries and n + 1 partition
is for polling query
- //The upper bound for the n+1 partition is set to null since its a
pollable partition
+ AbstractJdbcPollInputOperator<T> jdbcPoller = cloneUtils.getClone();
if (i < getPartitionCount()) {
- jdbcPoller.setRangeQueryPair(partitionToRangeMap.get(i));
- isPollable = false;
+ jdbcPoller.rangeQueryPair = partitionToRangeMap.get(i);
+ jdbcPoller.lastEmittedRecord = partitionToRangeMap.get(i).getKey();
--- End diff --
Set ```jdbcPoller.isPollerPartition = false``` here?
> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
> Issue Type: Improvement
> Reporter: Priyanka Gugale
> Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)