Very Nice!. Many thanks Kevin. I wish I found this out a couple of weeks ago.
Andy From: Kevin Mellott <kevin.r.mell...@gmail.com> Date: Wednesday, January 20, 2016 at 4:34 PM To: Andrew Davidson <a...@santacruzintegration.com> Cc: "user @spark" <user@spark.apache.org> Subject: Re: trouble implementing complex transformer in java that can be used with Pipeline. Scala to Java porting problem > Hi Andy, > > According to the API documentation for DataFrame > <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql > .DataFrame> , you should have access to sqlContext as a property off of the > DataFrame instance. In your example, you could then do something like: > > df.sqlContext.udf.register(...) > > Thanks, > Kevin > > On Wed, Jan 20, 2016 at 6:15 PM, Andy Davidson <a...@santacruzintegration.com> > wrote: >> For clarity callUDF() is not defined on DataFrames. It is defined on >> org.apache.spark.sql.functions . Strange the class name starts with lower >> case. I have not figure out how to use function class. >> >> http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.h >> tml >> >> Andy >> >> From: Andrew Davidson <a...@santacruzintegration.com> >> Date: Wednesday, January 20, 2016 at 4:05 PM >> To: "user @spark" <user@spark.apache.org> >> Subject: trouble implementing complex transformer in java that can be used >> with Pipeline. Scala to Java porting problem >> >>> I am using 1.6.0. I am having trouble implementing a custom transformer >>> derived from org.apache.spark.ml.Transformer in Java that I can use in a >>> PipeLine. >>> >>> So far the only way I figure out how to implement any kind of complex >>> functionality and have it applied to a DataFrame is to implement a UDF. For >>> example >>> >>> >>> class StemmerUDF implements UDF1<String, List<String>>, Serializable { >>> >>> private static final long serialVersionUID = 1L; >>> >>> >>> >>> @Override >>> >>> public List<String> call(String text) throws Exception { >>> >>> List<String> ret = stemText(text); //call org.apache.lucene >>> >>> return ret; >>> >>> } >>> >>> } >>> >>> >>> >>> Before I can use the UDF it needs to be registered. This requires the >>> sqlContext. The problem is sqlContext is not available during >>> pipeline.load() >>> >>> >>> void registerUDF(SQLContext sqlContext) { >>> >>> if (udf == null) { >>> >>> udf = new StemmerUDF(); >>> >>> DataType returnType = >>> DataTypes.createArrayType(DataTypes.StringType); >>> >>> sqlContext.udf().register(udfName, udf, returnType); >>> >>> } >>> >>> } >>> >>> >>> Our transformer needs to implement transform(). For it to be able to use the >>> registered UDF we need the sqlContext. The problem is the sqlContext is not >>> part of the signature of transform. My current hack is to pass the >>> sqlContext to the constructor and not to use pipelines >>> @Override >>> >>> public DataFrame transform(DataFrame df) { >>> >>> String fmt = "%s(%s) as %s"; >>> >>> String stmt = String.format(fmt, udfName, inputCol, outputCol); >>> >>> logger.info("\nstmt: {}", stmt); >>> >>> DataFrame ret = df.selectExpr("*", stmt); >>> >>> return ret; >>> >>> } >>> >>> >>> >>> Is they a way to do something like df.callUDF(myUDF); >>> >>> >>> >>> The following Scala code looks like it is close to what I need. I not been >>> able to figure out how do something like this in Java 8. callUDF does not >>> seem to be avaliable. >>> >>> >>> >>> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala >>> >>> @DeveloperApi >>> >>> abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT, T]] >>> >>> extends Transformer with HasInputCol with HasOutputCol with Logging { >>> >>> >>> >>> . . . >>> >>> >>> >>> override def transform(dataset: DataFrame): DataFrame = { >>> >>> transformSchema(dataset.schema, logging = true) >>> >>> dataset.withColumn($(outputCol), >>> >>> callUDF(this.createTransformFunc, outputDataType, >>> dataset($(inputCol)))) >>> >>> } >>> >>> >>> >>> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer >>> .scala >>> >>> >>> >>> class Tokenizer(override val uid: String) >>> >>> extends UnaryTransformer[String, Seq[String], Tokenizer] with >>> DefaultParamsWritable { >>> >>> >>> >>> . . . >>> >>> override protected def createTransformFunc: String => Seq[String] = { >>> >>> _.toLowerCase.split("\\s") >>> >>> } >>> >>> . . . >>> >>> } >>> >>> >>> >>> Kind regards >>> >>> >>> >>> Andy >>> >>> >