[
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15413126#comment-15413126
]
ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------
Github user bhupeshchawda commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/358#discussion_r74011772
--- Diff:
library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java ---
@@ -18,214 +18,75 @@
*/
package com.datatorrent.lib.db.jdbc;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import com.google.common.collect.Lists;
import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
/**
- * A concrete implementation for {@link AbstractJdbcPollInputOperator}} for
+ * A concrete implementation for {@link AbstractJdbcPollInputOperator} for
* consuming data from MySQL using JDBC interface <br>
- * User needs to provide tableName,dbConnection,setEmitColumnList,look-up
key
- * <br>
- * Optionally batchSize,pollInterval,Look-up key and a where clause can be
given
- * <br>
+ * User needs to provide tableName,dbConnection,columnsExpression,look-up
key <br>
+ * Optionally batchSize,pollInterval,Look-up key and a where clause can be
given <br>
* This operator uses static partitioning to arrive at range queries for
exactly
* once reads<br>
* Assumption is that there is an ordered column using which range queries
can
* be formed<br>
- * If an emitColumnList is provided, please ensure that the keyColumn is
the
+ * If an columnsExpression is provided, please ensure that the keyColumn
is the
* first column in the list<br>
- * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
- * comma separated list of the emit columns eg columnA,columnB,columnC
*
* @displayName Jdbc Polling Input Operator
* @category Input
* @tags database, sql, jdbc
*/
@Evolving
-@OperatorAnnotation(checkpointableWithinAppWindow = false)
-public class JdbcPollInputOperator extends
AbstractJdbcPollInputOperator<Object>
+public class JdbcPollInputOperator extends
AbstractJdbcPollInputOperator<String>
{
- private long lastBatchWindowId;
- private transient long currentWindowId;
- private long lastCreationTsMillis;
- private long fetchBackMillis = 0L;
+ @OutputPortFieldAnnotation
+ public final transient DefaultOutputPort<String> outputPort = new
DefaultOutputPort<>();
private ArrayList<String> emitColumns;
- private transient int count = 0;
-
- /**
- * Returns the emit columns
- */
- public List<String> getEmitColumns()
- {
- return emitColumns;
- }
-
- /**
- * Sets the emit columns
- */
- public void setEmitColumns(ArrayList<String> emitColumns)
- {
- this.emitColumns = emitColumns;
- }
-
- /**
- * Returns fetchBackMilis
- */
- public long getFetchBackMillis()
- {
- return fetchBackMillis;
- }
-
- /**
- * Sets fetchBackMilis - used in polling
- */
- public void setFetchBackMillis(long fetchBackMillis)
- {
- this.fetchBackMillis = fetchBackMillis;
- }
@Override
public void setup(OperatorContext context)
{
super.setup(context);
- parseEmitColumnList(getEmitColumnList());
- lastCreationTsMillis = System.currentTimeMillis() - fetchBackMillis;
+ parseEmitColumnList();
}
- private void parseEmitColumnList(String columnList)
+ private void parseEmitColumnList()
{
- String[] cols = columnList.split(",");
- ArrayList<String> arr = Lists.newArrayList();
+ String[] cols = getColumnsExpression().split(",");
+ emitColumns = Lists.newArrayList();
for (int i = 0; i < cols.length; i++) {
- arr.add(cols[i]);
+ emitColumns.add(cols[i]);
}
- setEmitColumns(arr);
- }
-
- @Override
- public void beginWindow(long l)
- {
- super.beginWindow(l);
- currentWindowId = l;
}
@Override
- protected void pollRecords(PreparedStatement ps)
+ public String getTuple(ResultSet rs)
{
- ResultSet rs = null;
- List<Object> metaList = new ArrayList<>();
-
- if (isReplayed) {
- return;
- }
-
+ StringBuilder resultTuple = new StringBuilder();
try {
- if (ps.isClosed()) {
- LOG.debug("Returning due to closed ps for non-pollable
partitions");
- return;
+ for (String obj : emitColumns) {
+ resultTuple.append(rs.getObject(obj) + ",");
}
+ resultTuple.substring(0, resultTuple.length() - 1); //remove last
comma
--- End diff --
Need to assign the returned substring to ```resultTuple```
> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
> Issue Type: Improvement
> Reporter: Priyanka Gugale
> Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)