[
https://issues.apache.org/jira/browse/APEXMALHAR-1966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15323501#comment-15323501
]
ASF GitHub Bot commented on APEXMALHAR-1966:
--------------------------------------------
Github user bhupeshchawda commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/295#discussion_r66532961
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
---
@@ -162,35 +160,75 @@ public void activate(Context.OperatorContext context)
}
getters.add(getter);
}
+ super.activate(context);
+ }
+
+ private void populateFieldInfosFromPojo(ColumnDefinitions rsMetaData)
+ {
+ fieldInfos = Lists.newArrayList();
+ Field[] fields = pojoClass.getDeclaredFields();
+ for (int i = 0; i < rsMetaData.size(); i++) {
+ String columnName = rsMetaData.getName(i);
+ String pojoField = getMatchingField(fields, columnName);
+ if (pojoField != null && pojoField.length() != 0) {
+ fieldInfos.add(new FieldInfo(columnName, pojoField, null));
+ } else {
+ LOG.error("Couldn't find corrosponding pojo field for column: " +
columnName);
+ }
+ }
+ }
+
+ private String getMatchingField(Field[] fields, String columnName)
+ {
+ for (Field f : fields) {
+ if (f.getName().equalsIgnoreCase(columnName)) {
+ return f.getName();
+ }
+ }
+ return null;
}
@Override
public void deactivate()
{
}
+ /**
+ * {@inheritDoc} <br/>
+ * If statement/query is not specified by user, insert query is
constructed from fileInfo object and table name.
+ */
@Override
protected PreparedStatement getUpdateCommand()
{
+ PreparedStatement statement;
+ if (query == null) {
+ statement = prepareStatementFromFieldsAndTableName();
+ } else {
+ statement = store.getSession().prepare(query);
+ }
+ LOG.debug("Statement is: " + statement.getQueryString());
+ return statement;
+ }
+
+ private PreparedStatement prepareStatementFromFieldsAndTableName()
+ {
+ if (tablename == null || tablename.length() == 0) {
+ throw new RuntimeException("Please sepcify query or table name.");
+ }
StringBuilder queryfields = new StringBuilder();
StringBuilder values = new StringBuilder();
- for (FieldInfo fieldInfo: fieldInfos) {
+ for (FieldInfo fieldInfo : fieldInfos) {
--- End diff --
Please remove checkstyle related changes from this PR
> Cassandra output operator improvements
> --------------------------------------
>
> Key: APEXMALHAR-1966
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1966
> Project: Apache Apex Malhar
> Issue Type: Improvement
> Reporter: Priyanka Gugale
> Assignee: Priyanka Gugale
>
> Update existing Cassandra output operator to:
> 1. Accept use defined parameterized queries, the queries could be for update,
> insert or delete.
> 2. Add error port to emit tuples which couldn't be written to database.
> 3. Add metrics
> 4. Provide a way to restrict batch size
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)