I am using 1.6.0. I am having trouble implementing a custom transformer derived from org.apache.spark.ml.Transformer in Java that I can use in a PipeLine.
So far the only way I figure out how to implement any kind of complex functionality and have it applied to a DataFrame is to implement a UDF. For example class StemmerUDF implements UDF1<String, List<String>>, Serializable { private static final long serialVersionUID = 1L; @Override public List<String> call(String text) throws Exception { List<String> ret = stemText(text); //call org.apache.lucene return ret; } } Before I can use the UDF it needs to be registered. This requires the sqlContext. The problem is sqlContext is not available during pipeline.load() void registerUDF(SQLContext sqlContext) { if (udf == null) { udf = new StemmerUDF(); DataType returnType = DataTypes.createArrayType(DataTypes.StringType); sqlContext.udf().register(udfName, udf, returnType); } } Our transformer needs to implement transform(). For it to be able to use the registered UDF we need the sqlContext. The problem is the sqlContext is not part of the signature of transform. My current hack is to pass the sqlContext to the constructor and not to use pipelines @Override public DataFrame transform(DataFrame df) { String fmt = "%s(%s) as %s"; String stmt = String.format(fmt, udfName, inputCol, outputCol); logger.info("\nstmt: {}", stmt); DataFrame ret = df.selectExpr("*", stmt); return ret; } Is they a way to do something like df.callUDF(myUDF); The following Scala code looks like it is close to what I need. I not been able to figure out how do something like this in Java 8. callUDF does not seem to be avaliable. spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala @DeveloperApi abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT, T]] extends Transformer with HasInputCol with HasOutputCol with Logging { . . . override def transform(dataset: DataFrame): DataFrame = { transformSchema(dataset.schema, logging = true) dataset.withColumn($(outputCol), callUDF(this.createTransformFunc, outputDataType, dataset($(inputCol)))) } spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer .scala class Tokenizer(override val uid: String) extends UnaryTransformer[String, Seq[String], Tokenizer] with DefaultParamsWritable { . . . override protected def createTransformFunc: String => Seq[String] = { _.toLowerCase.split("\\s") } . . . } Kind regards Andy