Hello,

I am currently working on an application which simply read data from .txt file 
and slpits the words and inserts them into mysql. But I am getting error. I am 
using jdbcpojooutputoperator from malhar library. But I am getting an error on 
ActiveFieldInfos which says it is protected in jdbcpojoinputoperator and also I 
do not know what to do in Application.java file. I am attaching both the file 
here. Kindly help me resolve the issue.

Thank You,

Jaikit Jilka
/**
 * Put your copyright and license info here.
 */
package com.mycompany.test;

import com.datatorrent.api.Context;
import org.apache.hadoop.conf.Configuration;

import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;

import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.FieldInfo.SupportType;
import com.google.common.collect.Lists;
import java.util.List;


@ApplicationAnnotation(name="MyWordCountApplication")
public class Application implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
   Parser parser = dag.addOperator("Input", new Parser());
   Counter<String> counter = dag.addOperator("Counter", new Counter<String>());
   JdbcPOJOOutputOperator jdbcOutputOperator = dag.addOperator("JdbcOutput", new JdbcPOJOOutputOperator());

    JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
    
    jdbcOutputOperator.setStore(outputStore);

    jdbcOutputOperator.setFieldInfos(addFieldInfos());
    
    dag.getMeta(jdbcOutputOperator).getMeta(jdbcOutputOperator.input).getAttributes().put(Context.PortContext.TUPLE_CLASS,);
   dag.addStream("Count", parser.outputPort, counter.InputCounter);
   dag.addStream("Output", counter.OutputCounter, jdbcOutputOperator.input);
   
  }
  
  private List<FieldInfo> addFieldInfos()
  {
    List<FieldInfo> fieldInfos = Lists.newArrayList();
    fieldInfos.add(new FieldInfo("count", "count", SupportType.INTEGER));
    fieldInfos.add(new FieldInfo("word", "word", SupportType.STRING));
    
    return fieldInfos;
  }
}
/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package com.mycompany.test;

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.List;

import javax.validation.constraints.NotNull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;

import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator;
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.PojoUtils;
import com.datatorrent.lib.util.PojoUtils.Getter;
import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
import com.datatorrent.lib.util.PojoUtils.GetterDouble;
import com.datatorrent.lib.util.PojoUtils.GetterFloat;
import com.datatorrent.lib.util.PojoUtils.GetterInt;
import com.datatorrent.lib.util.PojoUtils.GetterLong;
import com.datatorrent.lib.util.PojoUtils.GetterShort;

/**
 * <p>
 * JdbcPOJOOutputOperator class.</p>
 * A Generic implementation of AbstractJdbcTransactionableOutputOperator which takes in any POJO.
 *
 * @displayName Jdbc Output Operator
 * @category Output
 * @tags database, sql, pojo, jdbc
 * @since 2.1.0
 */
@org.apache.hadoop.classification.InterfaceStability.Evolving
public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object>
    implements Operator.ActivationListener<OperatorContext>
{
   
  @NotNull
  private List<FieldInfo> fieldInfos;

  private List<Integer> columnDataTypes;

  @NotNull
  private String tablename;

  private final transient List<JdbcPOJOInputOperator.ActiveFieldInfo> columnFieldGetters;

  private String insertStatement;
  private transient Class<?> pojoClass;

  @InputPortFieldAnnotation(optional = true, schemaRequired = true)
  public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
  {
    @Override
    public void setup(Context.PortContext context)
    {
      pojoClass = context.getValue(Context.PortContext.TUPLE_CLASS);
    }

    @Override
    public void process(Object t)
    {
      JdbcPOJOOutputOperator.super.input.process(t);
    }

  };

  @Override
  public void setup(OperatorContext context)
  {
    StringBuilder columns = new StringBuilder();
    StringBuilder values = new StringBuilder();
    for (int i = 0; i < fieldInfos.size(); i++) {
      columns.append(fieldInfos.get(i).getColumnName());
      values.append("?");
      if (i < fieldInfos.size() - 1) {
        columns.append(",");
        values.append(",");
      }
    }
    insertStatement = "INSERT INTO "
            + tablename
            + " (" + columns.toString() + ")"
            + " VALUES (" + values.toString() + ")";
    LOG.debug("insert statement is {}", insertStatement);

    super.setup(context);

    if (columnDataTypes == null) {
      try {
        populateColumnDataTypes(columns.toString());
      } catch (SQLException e) {
        throw new RuntimeException(e);
      }
    }

    for (FieldInfo fi : fieldInfos) {
      columnFieldGetters.add(new JdbcPOJOInputOperator.ActiveFieldInfo(fi));
    }
  }

  protected void populateColumnDataTypes(String columns) throws SQLException
  {
    columnDataTypes = Lists.newArrayList();
    try (Statement st = store.getConnection().createStatement()) {
      ResultSet rs = st.executeQuery("select " + columns + " from " + tablename);

      ResultSetMetaData rsMetaData = rs.getMetaData();
      LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount());

      for (int i = 1; i <= rsMetaData.getColumnCount(); i++) {
        int type = rsMetaData.getColumnType(i);
        columnDataTypes.add(type);
        LOG.debug("column name {} type {}", rsMetaData.getColumnName(i), type);
      }
    }
  }

  public JdbcPOJOOutputOperator()
  {
    super();
    columnFieldGetters = Lists.newArrayList();
  }

  @Override
  protected String getUpdateCommand()
  {
    LOG.debug("insert statement is {}", insertStatement);
    return insertStatement;
  }

  @Override
  @SuppressWarnings("unchecked")
  protected void setStatementParameters(PreparedStatement statement, Object tuple) throws SQLException
  {
    final int size = columnDataTypes.size();
    for (int i = 0; i < size; i++) {
      final int type = columnDataTypes.get(i);
      JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo = columnFieldGetters.get(i);
      switch (type) {
        case (Types.CHAR):
        case (Types.VARCHAR):
          statement.setString(i + 1, ((Getter<Object, String>)activeFieldInfo.setterOrGetter).get(tuple));
          break;

        case (Types.BOOLEAN):
          statement.setBoolean(i + 1, ((GetterBoolean<Object>)activeFieldInfo.setterOrGetter).get(tuple));
          break;

        case (Types.TINYINT):
          statement.setByte(i + 1, ((PojoUtils.GetterByte<Object>)activeFieldInfo.setterOrGetter).get(tuple));
          break;

        case (Types.SMALLINT):
          statement.setShort(i + 1, ((GetterShort<Object>)activeFieldInfo.setterOrGetter).get(tuple));
          break;

        case (Types.INTEGER):
          statement.setInt(i + 1, ((GetterInt<Object>)activeFieldInfo.setterOrGetter).get(tuple));
          break;

        case (Types.BIGINT):
          statement.setLong(i + 1, ((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple));
          break;

        case (Types.FLOAT):
          statement.setFloat(i + 1, ((GetterFloat<Object>)activeFieldInfo.setterOrGetter).get(tuple));
          break;

        case (Types.DOUBLE):
          statement.setDouble(i + 1, ((GetterDouble<Object>)activeFieldInfo.setterOrGetter).get(tuple));
          break;

        case Types.DECIMAL:
          statement.setBigDecimal(i + 1, ((Getter<Object, BigDecimal>)activeFieldInfo.setterOrGetter).get(tuple));
          break;

        case Types.TIMESTAMP:
          statement.setTimestamp(i + 1, new Timestamp(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
          break;

        case Types.TIME:
          statement.setTime(i + 1, new Time(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
          break;

        case Types.DATE:
          statement.setDate(i + 1, new Date(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
          break;

        default:
          handleUnknownDataType(type, tuple, activeFieldInfo);
          break;
      }
    }
  }

  @SuppressWarnings("UnusedParameters")
  protected void handleUnknownDataType(int type, Object tuple, JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo)
  {
    throw new RuntimeException("unsupported data type " + type);
  }

  /**
   * A list of {@link FieldInfo}s where each item maps a column name to a pojo field name.
   */
  public List<FieldInfo> getFieldInfos()
  {
    return fieldInfos;
  }

  /**
   * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a pojo field name.<br/>
   * The value from fieldInfo.column is assigned to fieldInfo.pojoFieldExpression.
   *
   * @description $[].columnName name of the database column name
   * @description $[].pojoFieldExpression pojo field name or expression
   * @useSchema $[].pojoFieldExpression input.fields[].name
   */
  public void setFieldInfos(List<FieldInfo> fieldInfos)
  {
    this.fieldInfos = fieldInfos;
  }

  /*
   * Gets the name of the table in database.
   */
  public String getTablename()
  {
    return tablename;
  }

  public void setTablename(String tablename)
  {
    this.tablename = tablename;
  }

  private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOOutputOperator.class);

  @Override
  public void activate(OperatorContext context)
  {
    final int size = columnDataTypes.size();
    for (int i = 0; i < size; i++) {
      final int type = columnDataTypes.get(i);
      JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo = columnFieldGetters.get(i);
      switch (type) {
        case (Types.CHAR):
        case (Types.VARCHAR):
          activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
              activeFieldInfo.fieldInfo.getPojoFieldExpression(),
            String.class);
          break;

        case (Types.BOOLEAN):
          activeFieldInfo.setterOrGetter = PojoUtils.createGetterBoolean(pojoClass,
              activeFieldInfo.fieldInfo.getPojoFieldExpression());
          break;

        case (Types.TINYINT):
          activeFieldInfo.setterOrGetter = PojoUtils.createGetterByte(pojoClass,
              activeFieldInfo.fieldInfo.getPojoFieldExpression());
          break;

        case (Types.SMALLINT):
          activeFieldInfo.setterOrGetter = PojoUtils.createGetterShort(pojoClass,
              activeFieldInfo.fieldInfo.getPojoFieldExpression());
          break;

        case (Types.INTEGER):
          activeFieldInfo.setterOrGetter = PojoUtils.createGetterInt(pojoClass,
              activeFieldInfo.fieldInfo.getPojoFieldExpression());
          break;

        case (Types.BIGINT):
          activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
              activeFieldInfo.fieldInfo.getPojoFieldExpression());
          break;

        case (Types.FLOAT):
          activeFieldInfo.setterOrGetter = PojoUtils.createGetterFloat(pojoClass,
              activeFieldInfo.fieldInfo.getPojoFieldExpression());
          break;

        case (Types.DOUBLE):
          activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass,
              activeFieldInfo.fieldInfo.getPojoFieldExpression());
          break;

        case Types.DECIMAL:
          activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
              activeFieldInfo.fieldInfo.getPojoFieldExpression(), BigDecimal.class);
          break;

        case Types.TIMESTAMP:
          activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
              activeFieldInfo.fieldInfo.getPojoFieldExpression());
          break;

        case Types.TIME:
          activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
              activeFieldInfo.fieldInfo.getPojoFieldExpression());
          break;

        case Types.DATE:
          activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
              activeFieldInfo.fieldInfo.getPojoFieldExpression());
          break;

        default:
          handleUnknownDataType(type, null, activeFieldInfo);
          break;
      }
    }
  }

  @Override
  public void deactivate()
  {
  }}

Reply via email to