Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r74011772 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java --- @@ -18,214 +18,75 @@ */ package com.datatorrent.lib.db.jdbc; -import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceStability.Evolving; import com.google.common.collect.Lists; import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; /** - * A concrete implementation for {@link AbstractJdbcPollInputOperator}} for + * A concrete implementation for {@link AbstractJdbcPollInputOperator} for * consuming data from MySQL using JDBC interface <br> - * User needs to provide tableName,dbConnection,setEmitColumnList,look-up key - * <br> - * Optionally batchSize,pollInterval,Look-up key and a where clause can be given - * <br> + * User needs to provide tableName,dbConnection,columnsExpression,look-up key <br> + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given <br> * This operator uses static partitioning to arrive at range queries for exactly * once reads<br> * Assumption is that there is an ordered column using which range queries can * be formed<br> - * If an emitColumnList is provided, please ensure that the keyColumn is the + * If an columnsExpression is provided, please ensure that the keyColumn is the * first column in the list<br> - * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - - * comma separated list of the emit columns eg columnA,columnB,columnC * * @displayName Jdbc Polling Input Operator * @category Input * @tags database, sql, jdbc */ @Evolving -@OperatorAnnotation(checkpointableWithinAppWindow = false) -public class JdbcPollInputOperator extends AbstractJdbcPollInputOperator<Object> +public class JdbcPollInputOperator extends AbstractJdbcPollInputOperator<String> { - private long lastBatchWindowId; - private transient long currentWindowId; - private long lastCreationTsMillis; - private long fetchBackMillis = 0L; + @OutputPortFieldAnnotation + public final transient DefaultOutputPort<String> outputPort = new DefaultOutputPort<>(); private ArrayList<String> emitColumns; - private transient int count = 0; - - /** - * Returns the emit columns - */ - public List<String> getEmitColumns() - { - return emitColumns; - } - - /** - * Sets the emit columns - */ - public void setEmitColumns(ArrayList<String> emitColumns) - { - this.emitColumns = emitColumns; - } - - /** - * Returns fetchBackMilis - */ - public long getFetchBackMillis() - { - return fetchBackMillis; - } - - /** - * Sets fetchBackMilis - used in polling - */ - public void setFetchBackMillis(long fetchBackMillis) - { - this.fetchBackMillis = fetchBackMillis; - } @Override public void setup(OperatorContext context) { super.setup(context); - parseEmitColumnList(getEmitColumnList()); - lastCreationTsMillis = System.currentTimeMillis() - fetchBackMillis; + parseEmitColumnList(); } - private void parseEmitColumnList(String columnList) + private void parseEmitColumnList() { - String[] cols = columnList.split(","); - ArrayList<String> arr = Lists.newArrayList(); + String[] cols = getColumnsExpression().split(","); + emitColumns = Lists.newArrayList(); for (int i = 0; i < cols.length; i++) { - arr.add(cols[i]); + emitColumns.add(cols[i]); } - setEmitColumns(arr); - } - - @Override - public void beginWindow(long l) - { - super.beginWindow(l); - currentWindowId = l; } @Override - protected void pollRecords(PreparedStatement ps) + public String getTuple(ResultSet rs) { - ResultSet rs = null; - List<Object> metaList = new ArrayList<>(); - - if (isReplayed) { - return; - } - + StringBuilder resultTuple = new StringBuilder(); try { - if (ps.isClosed()) { - LOG.debug("Returning due to closed ps for non-pollable partitions"); - return; + for (String obj : emitColumns) { + resultTuple.append(rs.getObject(obj) + ","); } + resultTuple.substring(0, resultTuple.length() - 1); //remove last comma --- End diff -- Need to assign the returned substring to ```resultTuple```
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---