Hi Micheal

I really appreciate your help. I The following code works. Is there a way
this example can be added to the distribution to make it easier for future
java programmers? It look me a long time get to this simple solution.

I'll need to tweak this example a little to work with the new PipeLine save
functionality. We need the current sqlContext to register our UDF. I see if
I can pass this in the Param Map. I¹ll throw and exception is some one use
transform(df) 

public class StemmerTransformer extends Transformer implements Serializable
{

   void registerUDF() {

        if (udf == null) {

            udf = new UDF();

            DataType returnType =
DataTypes.createArrayType(DataTypes.StringType);

            sqlContext.udf().register(udfName, udf, returnType);

        }

    }



   @Override

    public DataFrame transform(DataFrame df) {

        df.printSchema();

        df.show();

        

        registerUDF();



        DataFrame ret = df.selectExpr("*", "StemUDF(rawInput) as
filteredOutput");

        return ret;

    }



   class UDF implements UDF1<WrappedArray<String>, List<String>> {

        private static final long serialVersionUID = 1L;



        @Override

        public List<String> call(WrappedArray<String> wordsArg) throws
Exception {

            List<String> words = JavaConversions.asJavaList(wordsArg);

            ArrayList<String> ret = new ArrayList<String>(words.size());

            for (String word : words) {

// TODO replace test code

                ret.add(word + "_stemed");

            }

           

            return ret;

        }

    }

}



root

 |-- rawInput: array (nullable = false)

 |    |-- element: string (containsNull = true)



+--------------------+

|            rawInput|

+--------------------+

|[I, saw, the, red...|

|[Mary, had, a, li...|

|[greet, greeting,...|

+--------------------+



root

 |-- rawInput: array (nullable = false)

 |    |-- element: string (containsNull = true)

 |-- filteredOutput: array (nullable = true)

 |    |-- element: string (containsNull = true)



+----------------------------------+----------------------------------------
-----------------------+

|rawInput                          |filteredOutput
|

+----------------------------------+----------------------------------------
-----------------------+

|[I, saw, the, red, baloon]        |[I_stemed, saw_stemed, the_stemed,
red_stemed, baloon_stemed]  |

|[Mary, had, a, little, lamb]      |[Mary_stemed, had_stemed, a_stemed,
little_stemed, lamb_stemed]|

|[greet, greeting, greets, greeted]|[greet_stemed, greeting_stemed,
greets_stemed, greeted_stemed] |

+----------------------------------+----------------------------------------
-----------------------+



From:  Michael Armbrust <mich...@databricks.com>
Date:  Tuesday, January 5, 2016 at 12:58 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: problem with DataFrame df.withColumn()
org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

>> I am trying to implement  org.apache.spark.ml <http://org.apache.spark.ml>
>> .Transformer interface in Java 8.
>> My understanding is the sudo code for transformers is something like
>> @Override
>> 
>>     public DataFrame transform(DataFrame df) {
>> 
>> 1. Select the input column
>> 
>> 2. Create a new column
>> 
>> 3. Append the new column to the df argument and return
>> 
>>    }
> 
> 
> The following line can be used inside of the transform function to return a
> Dataframe that has been augmented with a new column using the stem lambda
> function (defined as a UDF below).
> return df.withColumn("filteredInput", expr("stem(rawInput)"));
> This is producing a new column called filterInput (that is appended to
> whatever columns are already there) by passing the column rawInput to your
> arbitrary lambda function.
>  
>> Based on my experience the current DataFrame api is very limited. You can not
>> apply a complicated lambda function. As a work around I convert the data
>> frame to a JavaRDD, apply my complicated lambda, and then convert the
>> resulting RDD back to a Data Frame.
> 
> 
> This is exactly what this code is doing.  You are defining an arbitrary lambda
> function as a UDF.  The difference here, when compared to a JavaRDD map, is
> that you can use this UDF to append columns without having to manually append
> the new data to some existing object.
> sqlContext.udf().register("stem", new UDF1<String, String>() {
>   @Override
>   public String call(String str) {
>     return // TODO: stemming code here
>   }
> }, DataTypes.StringType);
>> Now I select the ³new column² from the Data Frame and try to call
>> df.withColumn().
>> 
>> 
>> 
>> I can try an implement this as a UDF. How ever I need to use several 3rd
>> party jars. Any idea how insure the workers will have the required jar files?
>> If I was submitting a normal java app I would create an uber jar will this
>> work with UDFs?
> 
> 
> Yeah, UDFs are run the same way as your RDD lambda functions.


Reply via email to