Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/358#discussion_r74011772
  
    --- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java ---
    @@ -18,214 +18,75 @@
      */
     package com.datatorrent.lib.db.jdbc;
     
    -import java.sql.PreparedStatement;
     import java.sql.ResultSet;
     import java.sql.SQLException;
     import java.util.ArrayList;
    -import java.util.List;
    -
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
     
     import org.apache.hadoop.classification.InterfaceStability.Evolving;
     
     import com.google.common.collect.Lists;
     
     import com.datatorrent.api.Context.OperatorContext;
    -import com.datatorrent.api.annotation.OperatorAnnotation;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
     
     /**
    - * A concrete implementation for {@link AbstractJdbcPollInputOperator}} for
    + * A concrete implementation for {@link AbstractJdbcPollInputOperator} for
      * consuming data from MySQL using JDBC interface <br>
    - * User needs to provide tableName,dbConnection,setEmitColumnList,look-up 
key
    - * <br>
    - * Optionally batchSize,pollInterval,Look-up key and a where clause can be 
given
    - * <br>
    + * User needs to provide tableName,dbConnection,columnsExpression,look-up 
key <br>
    + * Optionally batchSize,pollInterval,Look-up key and a where clause can be 
given <br>
      * This operator uses static partitioning to arrive at range queries for 
exactly
      * once reads<br>
      * Assumption is that there is an ordered column using which range queries 
can
      * be formed<br>
    - * If an emitColumnList is provided, please ensure that the keyColumn is 
the
    + * If an columnsExpression is provided, please ensure that the keyColumn 
is the
      * first column in the list<br>
    - * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
    - * comma separated list of the emit columns eg columnA,columnB,columnC
      * 
      * @displayName Jdbc Polling Input Operator
      * @category Input
      * @tags database, sql, jdbc
      */
     @Evolving
    -@OperatorAnnotation(checkpointableWithinAppWindow = false)
    -public class JdbcPollInputOperator extends 
AbstractJdbcPollInputOperator<Object>
    +public class JdbcPollInputOperator extends 
AbstractJdbcPollInputOperator<String>
     {
    -  private long lastBatchWindowId;
    -  private transient long currentWindowId;
    -  private long lastCreationTsMillis;
    -  private long fetchBackMillis = 0L;
    +  @OutputPortFieldAnnotation
    +  public final transient DefaultOutputPort<String> outputPort = new 
DefaultOutputPort<>();
       private ArrayList<String> emitColumns;
    -  private transient int count = 0;
    -
    -  /**
    -   * Returns the emit columns
    -   */
    -  public List<String> getEmitColumns()
    -  {
    -    return emitColumns;
    -  }
    -
    -  /**
    -   * Sets the emit columns
    -   */
    -  public void setEmitColumns(ArrayList<String> emitColumns)
    -  {
    -    this.emitColumns = emitColumns;
    -  }
    -
    -  /**
    -   * Returns fetchBackMilis
    -   */
    -  public long getFetchBackMillis()
    -  {
    -    return fetchBackMillis;
    -  }
    -
    -  /**
    -   * Sets fetchBackMilis - used in polling
    -   */
    -  public void setFetchBackMillis(long fetchBackMillis)
    -  {
    -    this.fetchBackMillis = fetchBackMillis;
    -  }
     
       @Override
       public void setup(OperatorContext context)
       {
         super.setup(context);
    -    parseEmitColumnList(getEmitColumnList());
    -    lastCreationTsMillis = System.currentTimeMillis() - fetchBackMillis;
    +    parseEmitColumnList();
       }
     
    -  private void parseEmitColumnList(String columnList)
    +  private void parseEmitColumnList()
       {
    -    String[] cols = columnList.split(",");
    -    ArrayList<String> arr = Lists.newArrayList();
    +    String[] cols = getColumnsExpression().split(",");
    +    emitColumns = Lists.newArrayList();
         for (int i = 0; i < cols.length; i++) {
    -      arr.add(cols[i]);
    +      emitColumns.add(cols[i]);
         }
    -    setEmitColumns(arr);
    -  }
    -
    -  @Override
    -  public void beginWindow(long l)
    -  {
    -    super.beginWindow(l);
    -    currentWindowId = l;
       }
     
       @Override
    -  protected void pollRecords(PreparedStatement ps)
    +  public String getTuple(ResultSet rs)
       {
    -    ResultSet rs = null;
    -    List<Object> metaList = new ArrayList<>();
    -
    -    if (isReplayed) {
    -      return;
    -    }
    -
    +    StringBuilder resultTuple = new StringBuilder();
         try {
    -      if (ps.isClosed()) {
    -        LOG.debug("Returning due to closed ps for non-pollable 
partitions");
    -        return;
    +      for (String obj : emitColumns) {
    +        resultTuple.append(rs.getObject(obj) + ",");
           }
    +      resultTuple.substring(0, resultTuple.length() - 1); //remove last 
comma
    --- End diff --
    
    Need to assign the returned substring to ```resultTuple```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to