I am having a heck of a time writing a simple transformer in Java. I assume that my Transformer is supposed to append a new column to the dataFrame argument. Any idea why I get the following exception in Java 8 when I try to call DataFrame withColumn()? The JavaDoc says withColumn() "Returns a new DataFrame <http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrame .html> by adding a column or replacing the existing column that has the same name.²
Also do transformers always run in the driver? If not I assume workers do not have the sqlContext. Any idea how I can convert an javaRDD<> to a Column with out a sqlContext? Kind regards Andy P.s. I am using spark 1.6.0 org.apache.spark.sql.AnalysisException: resolved attribute(s) filteredOutput#1 missing from rawInput#0 in operator !Project [rawInput#0,filteredOutput#1 AS filteredOutput#2]; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(Chec kAnalysis.scala:38) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala: 44) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$ 1.apply(CheckAnalysis.scala:183) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$ 1.apply(CheckAnalysis.scala:50) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(Che ckAnalysis.scala:50) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala :44) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution. scala:34) at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(Data Frame.scala:2165) at org.apache.spark.sql.DataFrame.select(DataFrame.scala:751) at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1227) at com.pws.poc.ml.StemmerTransformer.transform(StemmerTransformer.java:110) at com.pws.poc.ml.StemmerTransformerTest.test(StemmerTransformerTest.java:45) public class StemmerTransformer extends Transformer implements Serializable { String inputCol; // unit test sets to rawInput String outputCol; // unit test sets to filteredOutput public StemmerTransformer(SQLContext sqlContext) { // will only work if transformers execute in the driver this.sqlContext = sqlContext; } @Override public DataFrame transform(DataFrame df) { df.printSchema(); df.show(); JavaRDD<Row> inRowRDD = df.select(inputCol).javaRDD(); JavaRDD<Row> outRowRDD = inRowRDD.map((Row row) -> { // TODO add stemming code // Create a new Row Row ret = RowFactory.create("TODO"); return ret; }); //can we create a Col from a JavaRDD<Row>? List<StructField> fields = new ArrayList<StructField>(); boolean nullable = true; fields.add(DataTypes.createStructField(outputCol, DataTypes.StringType, nullable)); StructType schema = DataTypes.createStructType(fields); DataFrame outputDF = sqlContext.createDataFrame(outRowRDD, schema); outputDF.printSchema(); outputDF.show(); Column newCol = outputDF.col(outputCol); return df.withColumn(outputCol, newCol); } SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder] WARN 03:58:46 main o.a.h.u.NativeCodeLoader <clinit> line:62 Unable to load native-hadoop library for your platform... using builtin-java classes where applicable root |-- rawInput: array (nullable = false) | |-- element: string (containsNull = true) +--------------------+ | rawInput| +--------------------+ |[I, saw, the, red...| |[Mary, had, a, li...| |[greet, greeting,...| +--------------------+ root |-- filteredOutput: string (nullable = true) +--------------+ |filteredOutput| +--------------+ | TODO| | TODO| | TODO| +--------------+