I am having a heck of a time writing a simple transformer in Java. I assume
that my Transformer is supposed to append a new column to the dataFrame
argument. Any idea why I get the following exception in Java 8 when I try to
call DataFrame withColumn()? The JavaDoc says withColumn() "Returns a new
DataFrame 
<http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrame
.html>  by adding a column or replacing the existing column that has the
same name.²


Also do transformers always run in the driver? If not I assume workers do
not have the sqlContext. Any idea how I can convert an javaRDD<> to a Column
with out a sqlContext?

Kind regards

Andy

P.s. I am using spark 1.6.0

org.apache.spark.sql.AnalysisException: resolved attribute(s)
filteredOutput#1 missing from rawInput#0 in operator !Project
[rawInput#0,filteredOutput#1 AS filteredOutput#2];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(Chec
kAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:
44)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$
1.apply(CheckAnalysis.scala:183)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$
1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(Che
ckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala
:44)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.
scala:34)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(Data
Frame.scala:2165)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:751)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1227)
at com.pws.poc.ml.StemmerTransformer.transform(StemmerTransformer.java:110)
at 
com.pws.poc.ml.StemmerTransformerTest.test(StemmerTransformerTest.java:45)



public class StemmerTransformer extends Transformer implements Serializable
{

   String inputCol; // unit test sets to rawInput
   String outputCol; // unit test sets to filteredOutput

  Š


  public StemmerTransformer(SQLContext sqlContext) {

// will only work if transformers execute in the driver

        this.sqlContext = sqlContext;

    }


     @Override

    public DataFrame transform(DataFrame df) {

        df.printSchema();

        df.show();

        

        JavaRDD<Row> inRowRDD = df.select(inputCol).javaRDD();

        JavaRDD<Row> outRowRDD = inRowRDD.map((Row row) -> {

            // TODO add stemming code

            // Create a new Row

            Row ret = RowFactory.create("TODO");

            return ret;

        });

        

        //can we create a Col from a JavaRDD<Row>?

        

        List<StructField> fields = new ArrayList<StructField>();

        boolean nullable = true;

        fields.add(DataTypes.createStructField(outputCol,
DataTypes.StringType, nullable));



        StructType schema =  DataTypes.createStructType(fields);

        DataFrame outputDF = sqlContext.createDataFrame(outRowRDD, schema);

        outputDF.printSchema();

        outputDF.show();

        Column newCol = outputDF.col(outputCol);

        

        return df.withColumn(outputCol, newCol);

    }



SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.

SLF4J: Actual binding is of type
[ch.qos.logback.classic.util.ContextSelectorStaticBinder]

WARN  03:58:46 main o.a.h.u.NativeCodeLoader <clinit> line:62 Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable

root

 |-- rawInput: array (nullable = false)

 |    |-- element: string (containsNull = true)



+--------------------+

|            rawInput|

+--------------------+

|[I, saw, the, red...|

|[Mary, had, a, li...|

|[greet, greeting,...|

+--------------------+



root

 |-- filteredOutput: string (nullable = true)



+--------------+

|filteredOutput|

+--------------+

|          TODO|

|          TODO|

|          TODO|

+--------------+




Reply via email to