Hi Michael I am not sure you under stand my code correct.
I am trying to implement org.apache.spark.ml.Transformer interface in Java 8. My understanding is the sudo code for transformers is something like @Override public DataFrame transform(DataFrame df) { 1. Select the input column 2. Create a new column 3. Append the new column to the df argument and return } Based on my experience the current DataFrame api is very limited. You can not apply a complicated lambda function. As a work around I convert the data frame to a JavaRDD, apply my complicated lambda, and then convert the resulting RDD back to a Data Frame. Now I select the ³new column² from the Data Frame and try to call df.withColumn(). I can try an implement this as a UDF. How ever I need to use several 3rd party jars. Any idea how insure the workers will have the required jar files? If I was submitting a normal java app I would create an uber jar will this work with UDFs? Kind regards Andy From: Michael Armbrust <mich...@databricks.com> Date: Monday, January 4, 2016 at 11:14 PM To: Andrew Davidson <a...@santacruzintegration.com> Cc: "user @spark" <user@spark.apache.org> Subject: Re: problem with DataFrame df.withColumn() org.apache.spark.sql.AnalysisException: resolved attribute(s) missing > Its not really possible to convert an RDD to a Column. You can think of a > Column as an expression that produces a single output given some set of input > columns. If I understand your code correctly, I think this might be easier to > express as a UDF: > sqlContext.udf().register("stem", new UDF1<String, String>() { > @Override > public String call(String str) { > return // TODO: stemming code here > } > }, DataTypes.StringType); > DataFrame transformed = df.withColumn("filteredInput", > expr("stem(rawInput)")); > > On Mon, Jan 4, 2016 at 8:08 PM, Andy Davidson <a...@santacruzintegration.com> > wrote: >> I am having a heck of a time writing a simple transformer in Java. I assume >> that my Transformer is supposed to append a new column to the dataFrame >> argument. Any idea why I get the following exception in Java 8 when I try to >> call DataFrame withColumn()? The JavaDoc says withColumn() "Returns a new >> DataFrame >> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrame. >> html> by adding a column or replacing the existing column that has the same >> name.² >> >> >> Also do transformers always run in the driver? If not I assume workers do not >> have the sqlContext. Any idea how I can convert an javaRDD<> to a Column with >> out a sqlContext? >> >> Kind regards >> >> Andy >> >> P.s. I am using spark 1.6.0 >> >> org.apache.spark.sql.AnalysisException: resolved attribute(s) >> filteredOutput#1 missing from rawInput#0 in operator !Project >> [rawInput#0,filteredOutput#1 AS filteredOutput#2]; >> at >> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(Check >> Analysis.scala:38) >> at >> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:4 >> 4) >> at >> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1 >> .apply(CheckAnalysis.scala:183) >> at >> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1 >> .apply(CheckAnalysis.scala:50) >> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105) >> at >> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(Chec >> kAnalysis.scala:50) >> at >> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala: >> 44) >> at >> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.s >> cala:34) >> at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133) >> at org.apache.spark.sql.DataFrame.org >> <http://org.apache.spark.sql.DataFrame.org> >> $apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165) >> at org.apache.spark.sql.DataFrame.select(DataFrame.scala:751) >> at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1227) >> at com.pws.poc.ml.StemmerTransformer.transform(StemmerTransformer.java:110) >> at com.pws.poc.ml.StemmerTransformerTest.test(StemmerTransformerTest.java:45) >> >> >> >> public class StemmerTransformer extends Transformer implements Serializable { >> >> String inputCol; // unit test sets to rawInput >> String outputCol; // unit test sets to filteredOutput >> >> >> >> >> public StemmerTransformer(SQLContext sqlContext) { >> >> // will only work if transformers execute in the driver >> >> this.sqlContext = sqlContext; >> >> } >> >> >> @Override >> >> public DataFrame transform(DataFrame df) { >> >> df.printSchema(); >> >> df.show(); >> >> >> >> JavaRDD<Row> inRowRDD = df.select(inputCol).javaRDD(); >> >> JavaRDD<Row> outRowRDD = inRowRDD.map((Row row) -> { >> >> // TODO add stemming code >> >> // Create a new Row >> >> Row ret = RowFactory.create("TODO"); >> >> return ret; >> >> }); >> >> >> >> //can we create a Col from a JavaRDD<Row>? >> >> >> >> List<StructField> fields = new ArrayList<StructField>(); >> >> boolean nullable = true; >> >> fields.add(DataTypes.createStructField(outputCol, >> DataTypes.StringType, nullable)); >> >> >> >> StructType schema = DataTypes.createStructType(fields); >> >> DataFrame outputDF = sqlContext.createDataFrame(outRowRDD, schema); >> >> outputDF.printSchema(); >> >> outputDF.show(); >> >> Column newCol = outputDF.col(outputCol); >> >> >> >> return df.withColumn(outputCol, newCol); >> >> } >> >> >> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >> explanation. >> >> SLF4J: Actual binding is of type >> [ch.qos.logback.classic.util.ContextSelectorStaticBinder] >> >> WARN 03:58:46 main o.a.h.u.NativeCodeLoader <clinit> line:62 Unable to load >> native-hadoop library for your platform... using builtin-java classes where >> applicable >> >> root >> >> |-- rawInput: array (nullable = false) >> >> | |-- element: string (containsNull = true) >> >> >> >> +--------------------+ >> >> | rawInput| >> >> +--------------------+ >> >> |[I, saw, the, red...| >> >> |[Mary, had, a, li...| >> >> |[greet, greeting,...| >> >> +--------------------+ >> >> >> >> root >> >> |-- filteredOutput: string (nullable = true) >> >> >> >> +--------------+ >> >> |filteredOutput| >> >> +--------------+ >> >> | TODO| >> >> | TODO| >> >> | TODO| >> >> +--------------+ >> >> >