Very Nice!. Many thanks Kevin. I wish I found this out a couple of weeks
ago.

Andy

From:  Kevin Mellott <kevin.r.mell...@gmail.com>
Date:  Wednesday, January 20, 2016 at 4:34 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: trouble implementing complex transformer in java that can be
used with Pipeline. Scala to Java porting problem

> Hi Andy,
> 
> According to the API documentation for DataFrame
> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql
> .DataFrame> , you should have access to sqlContext as a property off of the
> DataFrame instance. In your example, you could then do something like:
> 
> df.sqlContext.udf.register(...)
> 
> Thanks,
> Kevin
> 
> On Wed, Jan 20, 2016 at 6:15 PM, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> For clarity callUDF() is not defined on DataFrames. It is defined on
>> org.apache.spark.sql.functions . Strange the class name starts with lower
>> case. I have not figure out how to use function class.
>> 
>> http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.h
>> tml
>> 
>> Andy
>> 
>> From:  Andrew Davidson <a...@santacruzintegration.com>
>> Date:  Wednesday, January 20, 2016 at 4:05 PM
>> To:  "user @spark" <user@spark.apache.org>
>> Subject:  trouble implementing  complex transformer in java that can be used
>> with Pipeline. Scala to Java porting problem
>> 
>>> I am using 1.6.0. I am having trouble implementing a custom transformer
>>> derived from org.apache.spark.ml.Transformer in Java that I can use in a
>>> PipeLine.
>>> 
>>> So far the only way I figure out how to implement any kind of complex
>>> functionality and have it applied to a DataFrame is to implement a UDF. For
>>> example
>>> 
>>> 
>>>    class StemmerUDF implements UDF1<String, List<String>>, Serializable {
>>> 
>>>         private static final long serialVersionUID = 1L;
>>> 
>>> 
>>> 
>>>         @Override
>>> 
>>>         public List<String> call(String text) throws Exception {
>>> 
>>>             List<String> ret = stemText(text); //call org.apache.lucene
>>> 
>>>             return ret;
>>> 
>>>         }
>>> 
>>>     }
>>> 
>>> 
>>> 
>>> Before I can use the UDF it needs to be registered. This requires the
>>> sqlContext. The problem is sqlContext is not available during
>>> pipeline.load()
>>> 
>>> 
>>>    void registerUDF(SQLContext sqlContext) {
>>> 
>>>         if (udf == null) {
>>> 
>>>             udf = new StemmerUDF();
>>> 
>>>             DataType returnType =
>>> DataTypes.createArrayType(DataTypes.StringType);
>>> 
>>>             sqlContext.udf().register(udfName, udf, returnType);
>>> 
>>>         }
>>> 
>>>     }
>>> 
>>> 
>>> Our transformer needs to implement transform(). For it to be able to use the
>>> registered UDF we need the sqlContext. The problem is the sqlContext is not
>>> part of the signature of transform. My current hack is to pass the
>>> sqlContext to the constructor and not to use pipelines
>>>   @Override
>>> 
>>>     public DataFrame transform(DataFrame df) {
>>> 
>>>       String fmt = "%s(%s) as %s";
>>> 
>>>         String stmt = String.format(fmt, udfName, inputCol, outputCol);
>>> 
>>>         logger.info("\nstmt: {}", stmt);
>>> 
>>>         DataFrame ret = df.selectExpr("*", stmt);
>>> 
>>>         return ret;
>>> 
>>> }
>>> 
>>> 
>>> 
>>> Is they a way to do something like df.callUDF(myUDF);
>>> 
>>> 
>>> 
>>> The following Scala code looks like it is close to what I need. I not been
>>> able to figure out how do something like this in Java 8. callUDF does not
>>> seem to be avaliable.
>>> 
>>> 
>>> 
>>> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
>>> 
>>> @DeveloperApi
>>> 
>>> abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT, T]]
>>> 
>>>   extends Transformer with HasInputCol with HasOutputCol with Logging {
>>> 
>>> 
>>> 
>>> . . .
>>> 
>>> 
>>> 
>>>  override def transform(dataset: DataFrame): DataFrame = {
>>> 
>>>     transformSchema(dataset.schema, logging = true)
>>> 
>>>     dataset.withColumn($(outputCol),
>>> 
>>>       callUDF(this.createTransformFunc, outputDataType,
>>> dataset($(inputCol))))
>>> 
>>>   }
>>> 
>>> 
>>> 
>>> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer
>>> .scala 
>>> 
>>> 
>>> 
>>> class Tokenizer(override val uid: String)
>>> 
>>>   extends UnaryTransformer[String, Seq[String], Tokenizer] with
>>> DefaultParamsWritable {
>>> 
>>> 
>>> 
>>> . . .
>>> 
>>>   override protected def createTransformFunc: String => Seq[String] = {
>>> 
>>>     _.toLowerCase.split("\\s")
>>> 
>>>   }
>>> 
>>> . . .
>>> 
>>> }
>>> 
>>> 
>>> 
>>> Kind regards
>>> 
>>> 
>>> 
>>> Andy
>>> 
>>> 
> 


Reply via email to