For clarity callUDF() is not defined on DataFrames. It is defined on
org.apache.spark.sql.functions . Strange the class name starts with lower
case. I have not figure out how to use function class.

http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.
html

Andy

From:  Andrew Davidson <a...@santacruzintegration.com>
Date:  Wednesday, January 20, 2016 at 4:05 PM
To:  "user @spark" <user@spark.apache.org>
Subject:  trouble implementing  complex transformer in java that can be used
with Pipeline. Scala to Java porting problem

> I am using 1.6.0. I am having trouble implementing a custom transformer
> derived from org.apache.spark.ml.Transformer in Java that I can use in a
> PipeLine.
> 
> So far the only way I figure out how to implement any kind of complex
> functionality and have it applied to a DataFrame is to implement a UDF. For
> example
> 
> 
>    class StemmerUDF implements UDF1<String, List<String>>, Serializable {
> 
>         private static final long serialVersionUID = 1L;
> 
> 
> 
>         @Override
> 
>         public List<String> call(String text) throws Exception {
> 
>             List<String> ret = stemText(text); //call org.apache.lucene
> 
>             return ret;
> 
>         }
> 
>     }
> 
> 
> 
> Before I can use the UDF it needs to be registered. This requires the
> sqlContext. The problem is sqlContext is not available during pipeline.load()
> 
> 
>    void registerUDF(SQLContext sqlContext) {
> 
>         if (udf == null) {
> 
>             udf = new StemmerUDF();
> 
>             DataType returnType =
> DataTypes.createArrayType(DataTypes.StringType);
> 
>             sqlContext.udf().register(udfName, udf, returnType);
> 
>         }
> 
>     }
> 
> 
> Our transformer needs to implement transform(). For it to be able to use the
> registered UDF we need the sqlContext. The problem is the sqlContext is not
> part of the signature of transform. My current hack is to pass the sqlContext
> to the constructor and not to use pipelines
>   @Override
> 
>     public DataFrame transform(DataFrame df) {
> 
>       String fmt = "%s(%s) as %s";
> 
>         String stmt = String.format(fmt, udfName, inputCol, outputCol);
> 
>         logger.info("\nstmt: {}", stmt);
> 
>         DataFrame ret = df.selectExpr("*", stmt);
> 
>         return ret;
> 
> }
> 
> 
> 
> Is they a way to do something like df.callUDF(myUDF);
> 
> 
> 
> The following Scala code looks like it is close to what I need. I not been
> able to figure out how do something like this in Java 8. callUDF does not seem
> to be avaliable.
> 
> 
> 
> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
> 
> @DeveloperApi
> 
> abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT, T]]
> 
>   extends Transformer with HasInputCol with HasOutputCol with Logging {
> 
> 
> 
> . . .
> 
> 
> 
>  override def transform(dataset: DataFrame): DataFrame = {
> 
>     transformSchema(dataset.schema, logging = true)
> 
>     dataset.withColumn($(outputCol),
> 
>       callUDF(this.createTransformFunc, outputDataType, dataset($(inputCol))))
> 
>   }
> 
> 
> 
> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.s
> cala 
> 
> 
> 
> class Tokenizer(override val uid: String)
> 
>   extends UnaryTransformer[String, Seq[String], Tokenizer] with
> DefaultParamsWritable {
> 
> 
> 
> . . .
> 
>   override protected def createTransformFunc: String => Seq[String] = {
> 
>     _.toLowerCase.split("\\s")
> 
>   }
> 
> . . .
> 
> }
> 
> 
> 
> Kind regards
> 
> 
> 
> Andy
> 
> 


Reply via email to