I am using 1.6.0. I am having trouble implementing a custom transformer
derived from org.apache.spark.ml.Transformer in Java that I can use in a
PipeLine.

So far the only way I figure out how to implement any kind of complex
functionality and have it applied to a DataFrame is to implement a UDF. For
example


   class StemmerUDF implements UDF1<String, List<String>>, Serializable {

        private static final long serialVersionUID = 1L;



        @Override

        public List<String> call(String text) throws Exception {

            List<String> ret = stemText(text); //call org.apache.lucene

            return ret;

        }

    }



Before I can use the UDF it needs to be registered. This requires the
sqlContext. The problem is sqlContext is not available during
pipeline.load() 


   void registerUDF(SQLContext sqlContext) {

        if (udf == null) {

            udf = new StemmerUDF();

            DataType returnType =
DataTypes.createArrayType(DataTypes.StringType);

            sqlContext.udf().register(udfName, udf, returnType);

        }

    }


Our transformer needs to implement transform(). For it to be able to use the
registered UDF we need the sqlContext. The problem is the sqlContext is not
part of the signature of transform. My current hack is to pass the
sqlContext to the constructor and not to use pipelines
  @Override

    public DataFrame transform(DataFrame df) {

      String fmt = "%s(%s) as %s";

        String stmt = String.format(fmt, udfName, inputCol, outputCol);

        logger.info("\nstmt: {}", stmt);

        DataFrame ret = df.selectExpr("*", stmt);

        return ret;

}



Is they a way to do something like df.callUDF(myUDF);



The following Scala code looks like it is close to what I need. I not been
able to figure out how do something like this in Java 8. callUDF does not
seem to be avaliable.



spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala

@DeveloperApi

abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT, T]]

  extends Transformer with HasInputCol with HasOutputCol with Logging {



. . .



 override def transform(dataset: DataFrame): DataFrame = {

    transformSchema(dataset.schema, logging = true)

    dataset.withColumn($(outputCol),

      callUDF(this.createTransformFunc, outputDataType,
dataset($(inputCol))))

  }



spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer
.scala 



class Tokenizer(override val uid: String)

  extends UnaryTransformer[String, Seq[String], Tokenizer] with
DefaultParamsWritable {



. . .

  override protected def createTransformFunc: String => Seq[String] = {

    _.toLowerCase.split("\\s")

  }

. . .

}



Kind regards



Andy




Reply via email to