[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15413126#comment-15413126
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/358#discussion_r74011772
  
    --- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java ---
    @@ -18,214 +18,75 @@
      */
     package com.datatorrent.lib.db.jdbc;
     
    -import java.sql.PreparedStatement;
     import java.sql.ResultSet;
     import java.sql.SQLException;
     import java.util.ArrayList;
    -import java.util.List;
    -
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
     
     import org.apache.hadoop.classification.InterfaceStability.Evolving;
     
     import com.google.common.collect.Lists;
     
     import com.datatorrent.api.Context.OperatorContext;
    -import com.datatorrent.api.annotation.OperatorAnnotation;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
     
     /**
    - * A concrete implementation for {@link AbstractJdbcPollInputOperator}} for
    + * A concrete implementation for {@link AbstractJdbcPollInputOperator} for
      * consuming data from MySQL using JDBC interface <br>
    - * User needs to provide tableName,dbConnection,setEmitColumnList,look-up 
key
    - * <br>
    - * Optionally batchSize,pollInterval,Look-up key and a where clause can be 
given
    - * <br>
    + * User needs to provide tableName,dbConnection,columnsExpression,look-up 
key <br>
    + * Optionally batchSize,pollInterval,Look-up key and a where clause can be 
given <br>
      * This operator uses static partitioning to arrive at range queries for 
exactly
      * once reads<br>
      * Assumption is that there is an ordered column using which range queries 
can
      * be formed<br>
    - * If an emitColumnList is provided, please ensure that the keyColumn is 
the
    + * If an columnsExpression is provided, please ensure that the keyColumn 
is the
      * first column in the list<br>
    - * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
    - * comma separated list of the emit columns eg columnA,columnB,columnC
      * 
      * @displayName Jdbc Polling Input Operator
      * @category Input
      * @tags database, sql, jdbc
      */
     @Evolving
    -@OperatorAnnotation(checkpointableWithinAppWindow = false)
    -public class JdbcPollInputOperator extends 
AbstractJdbcPollInputOperator<Object>
    +public class JdbcPollInputOperator extends 
AbstractJdbcPollInputOperator<String>
     {
    -  private long lastBatchWindowId;
    -  private transient long currentWindowId;
    -  private long lastCreationTsMillis;
    -  private long fetchBackMillis = 0L;
    +  @OutputPortFieldAnnotation
    +  public final transient DefaultOutputPort<String> outputPort = new 
DefaultOutputPort<>();
       private ArrayList<String> emitColumns;
    -  private transient int count = 0;
    -
    -  /**
    -   * Returns the emit columns
    -   */
    -  public List<String> getEmitColumns()
    -  {
    -    return emitColumns;
    -  }
    -
    -  /**
    -   * Sets the emit columns
    -   */
    -  public void setEmitColumns(ArrayList<String> emitColumns)
    -  {
    -    this.emitColumns = emitColumns;
    -  }
    -
    -  /**
    -   * Returns fetchBackMilis
    -   */
    -  public long getFetchBackMillis()
    -  {
    -    return fetchBackMillis;
    -  }
    -
    -  /**
    -   * Sets fetchBackMilis - used in polling
    -   */
    -  public void setFetchBackMillis(long fetchBackMillis)
    -  {
    -    this.fetchBackMillis = fetchBackMillis;
    -  }
     
       @Override
       public void setup(OperatorContext context)
       {
         super.setup(context);
    -    parseEmitColumnList(getEmitColumnList());
    -    lastCreationTsMillis = System.currentTimeMillis() - fetchBackMillis;
    +    parseEmitColumnList();
       }
     
    -  private void parseEmitColumnList(String columnList)
    +  private void parseEmitColumnList()
       {
    -    String[] cols = columnList.split(",");
    -    ArrayList<String> arr = Lists.newArrayList();
    +    String[] cols = getColumnsExpression().split(",");
    +    emitColumns = Lists.newArrayList();
         for (int i = 0; i < cols.length; i++) {
    -      arr.add(cols[i]);
    +      emitColumns.add(cols[i]);
         }
    -    setEmitColumns(arr);
    -  }
    -
    -  @Override
    -  public void beginWindow(long l)
    -  {
    -    super.beginWindow(l);
    -    currentWindowId = l;
       }
     
       @Override
    -  protected void pollRecords(PreparedStatement ps)
    +  public String getTuple(ResultSet rs)
       {
    -    ResultSet rs = null;
    -    List<Object> metaList = new ArrayList<>();
    -
    -    if (isReplayed) {
    -      return;
    -    }
    -
    +    StringBuilder resultTuple = new StringBuilder();
         try {
    -      if (ps.isClosed()) {
    -        LOG.debug("Returning due to closed ps for non-pollable 
partitions");
    -        return;
    +      for (String obj : emitColumns) {
    +        resultTuple.append(rs.getObject(obj) + ",");
           }
    +      resultTuple.substring(0, resultTuple.length() - 1); //remove last 
comma
    --- End diff --
    
    Need to assign the returned substring to ```resultTuple```


> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
>                 Key: APEXMALHAR-2172
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
>             Project: Apache Apex Malhar
>          Issue Type: Improvement
>            Reporter: Priyanka Gugale
>            Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to