Hi Micheal I really appreciate your help. I The following code works. Is there a way this example can be added to the distribution to make it easier for future java programmers? It look me a long time get to this simple solution.
I'll need to tweak this example a little to work with the new PipeLine save functionality. We need the current sqlContext to register our UDF. I see if I can pass this in the Param Map. I¹ll throw and exception is some one use transform(df) public class StemmerTransformer extends Transformer implements Serializable { void registerUDF() { if (udf == null) { udf = new UDF(); DataType returnType = DataTypes.createArrayType(DataTypes.StringType); sqlContext.udf().register(udfName, udf, returnType); } } @Override public DataFrame transform(DataFrame df) { df.printSchema(); df.show(); registerUDF(); DataFrame ret = df.selectExpr("*", "StemUDF(rawInput) as filteredOutput"); return ret; } class UDF implements UDF1<WrappedArray<String>, List<String>> { private static final long serialVersionUID = 1L; @Override public List<String> call(WrappedArray<String> wordsArg) throws Exception { List<String> words = JavaConversions.asJavaList(wordsArg); ArrayList<String> ret = new ArrayList<String>(words.size()); for (String word : words) { // TODO replace test code ret.add(word + "_stemed"); } return ret; } } } root |-- rawInput: array (nullable = false) | |-- element: string (containsNull = true) +--------------------+ | rawInput| +--------------------+ |[I, saw, the, red...| |[Mary, had, a, li...| |[greet, greeting,...| +--------------------+ root |-- rawInput: array (nullable = false) | |-- element: string (containsNull = true) |-- filteredOutput: array (nullable = true) | |-- element: string (containsNull = true) +----------------------------------+---------------------------------------- -----------------------+ |rawInput |filteredOutput | +----------------------------------+---------------------------------------- -----------------------+ |[I, saw, the, red, baloon] |[I_stemed, saw_stemed, the_stemed, red_stemed, baloon_stemed] | |[Mary, had, a, little, lamb] |[Mary_stemed, had_stemed, a_stemed, little_stemed, lamb_stemed]| |[greet, greeting, greets, greeted]|[greet_stemed, greeting_stemed, greets_stemed, greeted_stemed] | +----------------------------------+---------------------------------------- -----------------------+ From: Michael Armbrust <mich...@databricks.com> Date: Tuesday, January 5, 2016 at 12:58 PM To: Andrew Davidson <a...@santacruzintegration.com> Cc: "user @spark" <user@spark.apache.org> Subject: Re: problem with DataFrame df.withColumn() org.apache.spark.sql.AnalysisException: resolved attribute(s) missing >> I am trying to implement org.apache.spark.ml <http://org.apache.spark.ml> >> .Transformer interface in Java 8. >> My understanding is the sudo code for transformers is something like >> @Override >> >> public DataFrame transform(DataFrame df) { >> >> 1. Select the input column >> >> 2. Create a new column >> >> 3. Append the new column to the df argument and return >> >> } > > > The following line can be used inside of the transform function to return a > Dataframe that has been augmented with a new column using the stem lambda > function (defined as a UDF below). > return df.withColumn("filteredInput", expr("stem(rawInput)")); > This is producing a new column called filterInput (that is appended to > whatever columns are already there) by passing the column rawInput to your > arbitrary lambda function. > >> Based on my experience the current DataFrame api is very limited. You can not >> apply a complicated lambda function. As a work around I convert the data >> frame to a JavaRDD, apply my complicated lambda, and then convert the >> resulting RDD back to a Data Frame. > > > This is exactly what this code is doing. You are defining an arbitrary lambda > function as a UDF. The difference here, when compared to a JavaRDD map, is > that you can use this UDF to append columns without having to manually append > the new data to some existing object. > sqlContext.udf().register("stem", new UDF1<String, String>() { > @Override > public String call(String str) { > return // TODO: stemming code here > } > }, DataTypes.StringType); >> Now I select the ³new column² from the Data Frame and try to call >> df.withColumn(). >> >> >> >> I can try an implement this as a UDF. How ever I need to use several 3rd >> party jars. Any idea how insure the workers will have the required jar files? >> If I was submitting a normal java app I would create an uber jar will this >> work with UDFs? > > > Yeah, UDFs are run the same way as your RDD lambda functions.