Hi Michael

I am not sure you under stand my code correct.

I am trying to implement  org.apache.spark.ml.Transformer interface in Java
8.


My understanding is the sudo code for transformers is something like
@Override

    public DataFrame transform(DataFrame df) {

1. Select the input column

2. Create a new column

3. Append the new column to the df argument and return

   }



Based on my experience the current DataFrame api is very limited. You can
not apply a complicated lambda function. As a work around I convert the data
frame to a JavaRDD, apply my complicated lambda, and then convert the
resulting RDD back to a Data Frame.



Now I select the ³new column² from the Data Frame and try to call
df.withColumn().



I can try an implement this as a UDF. How ever I need to use several 3rd
party jars. Any idea how insure the workers will have the required jar
files? If I was submitting a normal java app I would create an uber jar will
this work with UDFs?



Kind regards



Andy



From:  Michael Armbrust <mich...@databricks.com>
Date:  Monday, January 4, 2016 at 11:14 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: problem with DataFrame df.withColumn()
org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

> Its not really possible to convert an RDD to a Column.  You can think of a
> Column as an expression that produces a single output given some set of input
> columns.  If I understand your code correctly, I think this might be easier to
> express as a UDF:
> sqlContext.udf().register("stem", new UDF1<String, String>() {
>   @Override
>   public String call(String str) {
>     return // TODO: stemming code here
>   }
> }, DataTypes.StringType);
> DataFrame transformed = df.withColumn("filteredInput",
> expr("stem(rawInput)"));
> 
> On Mon, Jan 4, 2016 at 8:08 PM, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> I am having a heck of a time writing a simple transformer in Java. I assume
>> that my Transformer is supposed to append a new column to the dataFrame
>> argument. Any idea why I get the following exception in Java 8 when I try to
>> call DataFrame withColumn()? The JavaDoc says withColumn() "Returns a new
>> DataFrame 
>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrame.
>> html>  by adding a column or replacing the existing column that has the same
>> name.²
>> 
>> 
>> Also do transformers always run in the driver? If not I assume workers do not
>> have the sqlContext. Any idea how I can convert an javaRDD<> to a Column with
>> out a sqlContext?
>> 
>> Kind regards
>> 
>> Andy
>> 
>> P.s. I am using spark 1.6.0
>> 
>> org.apache.spark.sql.AnalysisException: resolved attribute(s)
>> filteredOutput#1 missing from rawInput#0 in operator !Project
>> [rawInput#0,filteredOutput#1 AS filteredOutput#2];
>> at 
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(Check
>> Analysis.scala:38)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:4
>> 4)
>> at 
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1
>> .apply(CheckAnalysis.scala:183)
>> at 
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1
>> .apply(CheckAnalysis.scala:50)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105)
>> at 
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(Chec
>> kAnalysis.scala:50)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:
>> 44)
>> at 
>> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.s
>> cala:34)
>> at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
>> at org.apache.spark.sql.DataFrame.org
>> <http://org.apache.spark.sql.DataFrame.org>
>> $apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165)
>> at org.apache.spark.sql.DataFrame.select(DataFrame.scala:751)
>> at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1227)
>> at com.pws.poc.ml.StemmerTransformer.transform(StemmerTransformer.java:110)
>> at com.pws.poc.ml.StemmerTransformerTest.test(StemmerTransformerTest.java:45)
>> 
>> 
>> 
>> public class StemmerTransformer extends Transformer implements Serializable {
>> 
>>    String inputCol; // unit test sets to rawInput
>>    String outputCol; // unit test sets to filteredOutput
>> 
>>   Š
>> 
>> 
>>   public StemmerTransformer(SQLContext sqlContext) {
>> 
>> // will only work if transformers execute in the driver
>> 
>>         this.sqlContext = sqlContext;
>> 
>>     }
>> 
>> 
>>      @Override
>> 
>>     public DataFrame transform(DataFrame df) {
>> 
>>         df.printSchema();
>> 
>>         df.show();
>> 
>>         
>> 
>>         JavaRDD<Row> inRowRDD = df.select(inputCol).javaRDD();
>> 
>>         JavaRDD<Row> outRowRDD = inRowRDD.map((Row row) -> {
>> 
>>             // TODO add stemming code
>> 
>>             // Create a new Row
>> 
>>             Row ret = RowFactory.create("TODO");
>> 
>>             return ret;
>> 
>>         });
>> 
>>         
>> 
>>         //can we create a Col from a JavaRDD<Row>?
>> 
>>         
>> 
>>         List<StructField> fields = new ArrayList<StructField>();
>> 
>>         boolean nullable = true;
>> 
>>         fields.add(DataTypes.createStructField(outputCol,
>> DataTypes.StringType, nullable));
>> 
>> 
>> 
>>         StructType schema =  DataTypes.createStructType(fields);
>> 
>>         DataFrame outputDF = sqlContext.createDataFrame(outRowRDD, schema);
>> 
>>         outputDF.printSchema();
>> 
>>         outputDF.show();
>> 
>>         Column newCol = outputDF.col(outputCol);
>> 
>>         
>> 
>>         return df.withColumn(outputCol, newCol);
>> 
>>     }
>> 
>> 
>> 
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> 
>> SLF4J: Actual binding is of type
>> [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
>> 
>> WARN  03:58:46 main o.a.h.u.NativeCodeLoader <clinit> line:62 Unable to load
>> native-hadoop library for your platform... using builtin-java classes where
>> applicable
>> 
>> root
>> 
>>  |-- rawInput: array (nullable = false)
>> 
>>  |    |-- element: string (containsNull = true)
>> 
>> 
>> 
>> +--------------------+
>> 
>> |            rawInput|
>> 
>> +--------------------+
>> 
>> |[I, saw, the, red...|
>> 
>> |[Mary, had, a, li...|
>> 
>> |[greet, greeting,...|
>> 
>> +--------------------+
>> 
>> 
>> 
>> root
>> 
>>  |-- filteredOutput: string (nullable = true)
>> 
>> 
>> 
>> +--------------+
>> 
>> |filteredOutput|
>> 
>> +--------------+
>> 
>> |          TODO|
>> 
>> |          TODO|
>> 
>> |          TODO|
>> 
>> +--------------+
>> 
>> 
> 


Reply via email to