Hi Andy, According to the API documentation for DataFrame <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame>, you should have access to *sqlContext* as a property off of the DataFrame instance. In your example, you could then do something like:
df.sqlContext.udf.register(...) Thanks, Kevin On Wed, Jan 20, 2016 at 6:15 PM, Andy Davidson < a...@santacruzintegration.com> wrote: > For clarity callUDF() is not defined on DataFrames. It is defined on > org.apache.spark.sql.functions > . Strange the class name starts with lower case. I have not figure out > how to use function class. > > > http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html > > Andy > > From: Andrew Davidson <a...@santacruzintegration.com> > Date: Wednesday, January 20, 2016 at 4:05 PM > To: "user @spark" <user@spark.apache.org> > Subject: trouble implementing complex transformer in java that can be > used with Pipeline. Scala to Java porting problem > > I am using 1.6.0. I am having trouble implementing a custom transformer > derived from org.apache.spark.ml.Transformer in Java that I can use in > a PipeLine. > > So far the only way I figure out how to implement any kind of complex > functionality and have it applied to a DataFrame is to implement a UDF. For > example > > > class StemmerUDF implements UDF1<String, List<String>>, Serializable { > > private static final long serialVersionUID = 1L; > > > @Override > > public List<String> call(String text) throws Exception { > > List<String> ret = stemText(text); //call org.apache.lucene > > return ret; > > } > > } > > > Before I can use the UDF it needs to be registered. This requires the > sqlContext. *The problem is sqlContext is not available during > pipeline.load()* > > void registerUDF(SQLContext sqlContext) { > > if (udf == null) { > > udf = new StemmerUDF(); > > DataType returnType = DataTypes.createArrayType(DataTypes. > StringType); > > sqlContext.udf().register(udfName, udf, returnType); > > } > > } > > > Our transformer needs to implement transform(). For it to be able to use > the registered UDF we need the sqlContext. *The problem is the sqlContext > is not part of the signature of transform.* My current hack is to pass > the sqlContext to the constructor and not to use pipelines > > @Override > > public DataFrame transform(DataFrame df) { > > String fmt = "%s(%s) as %s"; > > String stmt = String.format(fmt, udfName, inputCol, outputCol); > > logger.info("\nstmt: {}", stmt); > > DataFrame ret = df.selectExpr("*", stmt); > > return ret; > > } > > > Is they a way to do something like df.callUDF(myUDF); > > > *The following Scala code looks like it is close to what I need. I not > been able to figure out how do something like this in Java 8. callUDF does > not seem to be avaliable.* > > > > spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala > > @DeveloperApi > > abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT, > T]] > > extends Transformer with HasInputCol with HasOutputCol with Logging { > > . . . > > > override def transform(dataset: DataFrame): DataFrame = { > > transformSchema(dataset.schema, logging = true) > > dataset.withColumn($(outputCol), > > callUDF(this.createTransformFunc, outputDataType, dataset($(inputCol > )))) > > } > > > > spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala > > > class Tokenizer(override val uid: String) > > extends UnaryTransformer[String, Seq[String], Tokenizer] with > DefaultParamsWritable { > > > . . . > > override protected def createTransformFunc: String => Seq[String] = { > > _.toLowerCase.split("\\s") > > } > > . . . > > } > > > Kind regards > > > Andy > > >