[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15323501#comment-15323501
 ] 

ASF GitHub Bot commented on APEXMALHAR-1966:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/295#discussion_r66532961
  
    --- Diff: 
contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
 ---
    @@ -162,35 +160,75 @@ public void activate(Context.OperatorContext context)
           }
           getters.add(getter);
         }
    +    super.activate(context);
    +  }
    +
    +  private void populateFieldInfosFromPojo(ColumnDefinitions rsMetaData)
    +  {
    +    fieldInfos = Lists.newArrayList();
    +    Field[] fields = pojoClass.getDeclaredFields();
    +    for (int i = 0; i < rsMetaData.size(); i++) {
    +      String columnName = rsMetaData.getName(i);
    +      String pojoField = getMatchingField(fields, columnName);
    +      if (pojoField != null && pojoField.length() != 0) {
    +        fieldInfos.add(new FieldInfo(columnName, pojoField, null));
    +      } else {
    +        LOG.error("Couldn't find corrosponding pojo field for column: " + 
columnName);
    +      }
    +    }
    +  }
    +
    +  private String getMatchingField(Field[] fields, String columnName)
    +  {
    +    for (Field f : fields) {
    +      if (f.getName().equalsIgnoreCase(columnName)) {
    +        return f.getName();
    +      }
    +    }
    +    return null;
       }
     
       @Override
       public void deactivate()
       {
       }
     
    +  /**
    +   * {@inheritDoc} <br/>
    +   * If statement/query is not specified by user, insert query is 
constructed from fileInfo object and table name.
    +   */
       @Override
       protected PreparedStatement getUpdateCommand()
       {
    +    PreparedStatement statement;
    +    if (query == null) {
    +      statement = prepareStatementFromFieldsAndTableName();
    +    } else {
    +      statement = store.getSession().prepare(query);
    +    }
    +    LOG.debug("Statement is: " + statement.getQueryString());
    +    return statement;
    +  }
    +
    +  private PreparedStatement prepareStatementFromFieldsAndTableName()
    +  {
    +    if (tablename == null || tablename.length() == 0) {
    +      throw new RuntimeException("Please sepcify query or table name.");
    +    }
         StringBuilder queryfields = new StringBuilder();
         StringBuilder values = new StringBuilder();
    -    for (FieldInfo fieldInfo: fieldInfos) {
    +    for (FieldInfo fieldInfo : fieldInfos) {
    --- End diff --
    
    Please remove checkstyle related changes from this PR


> Cassandra output operator improvements
> --------------------------------------
>
>                 Key: APEXMALHAR-1966
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-1966
>             Project: Apache Apex Malhar
>          Issue Type: Improvement
>            Reporter: Priyanka Gugale
>            Assignee: Priyanka Gugale
>
> Update existing Cassandra output operator to:
> 1. Accept use defined parameterized queries, the queries could be for update, 
> insert or delete.
> 2. Add error port to emit tuples which couldn't be written to database.
> 3. Add metrics
> 4. Provide a way to restrict batch size



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to