Hi Andy,

According to the API documentation for DataFrame
<http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame>,
you should have access to *sqlContext* as a property off of the DataFrame
instance. In your example, you could then do something like:

df.sqlContext.udf.register(...)

Thanks,
Kevin

On Wed, Jan 20, 2016 at 6:15 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> For clarity callUDF() is not defined on DataFrames. It is defined on 
> org.apache.spark.sql.functions
> . Strange the class name starts with lower case. I have not figure out
> how to use function class.
>
>
> http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html
>
> Andy
>
> From: Andrew Davidson <a...@santacruzintegration.com>
> Date: Wednesday, January 20, 2016 at 4:05 PM
> To: "user @spark" <user@spark.apache.org>
> Subject: trouble implementing complex transformer in java that can be
> used with Pipeline. Scala to Java porting problem
>
> I am using 1.6.0. I am having trouble implementing a custom transformer
> derived from org.apache.spark.ml.Transformer in Java that I can use in
> a PipeLine.
>
> So far the only way I figure out how to implement any kind of complex
> functionality and have it applied to a DataFrame is to implement a UDF. For
> example
>
>
>    class StemmerUDF implements UDF1<String, List<String>>, Serializable {
>
>         private static final long serialVersionUID = 1L;
>
>
>         @Override
>
>         public List<String> call(String text) throws Exception {
>
>             List<String> ret = stemText(text); //call org.apache.lucene
>
>             return ret;
>
>         }
>
>     }
>
>
> Before I can use the UDF it needs to be registered. This requires the
> sqlContext. *The problem is sqlContext is not available during
> pipeline.load()*
>
>    void registerUDF(SQLContext sqlContext) {
>
>         if (udf == null) {
>
>             udf = new StemmerUDF();
>
>             DataType returnType = DataTypes.createArrayType(DataTypes.
> StringType);
>
>             sqlContext.udf().register(udfName, udf, returnType);
>
>         }
>
>     }
>
>
> Our transformer needs to implement transform(). For it to be able to use
> the registered UDF we need the sqlContext. *The problem is the sqlContext
> is not part of the signature of transform.* My current hack is to pass
> the sqlContext to the constructor and not to use pipelines
>
>   @Override
>
>     public DataFrame transform(DataFrame df) {
>
>       String fmt = "%s(%s) as %s";
>
>         String stmt = String.format(fmt, udfName, inputCol, outputCol);
>
>         logger.info("\nstmt: {}", stmt);
>
>         DataFrame ret = df.selectExpr("*", stmt);
>
>         return ret;
>
> }
>
>
> Is they a way to do something like df.callUDF(myUDF);
>
>
> *The following Scala code looks like it is close to what I need. I not
> been able to figure out how do something like this in Java 8. callUDF does
> not seem to be avaliable.*
>
>
>
> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
>
> @DeveloperApi
>
> abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT,
> T]]
>
>   extends Transformer with HasInputCol with HasOutputCol with Logging {
>
> . . .
>
>
>  override def transform(dataset: DataFrame): DataFrame = {
>
>     transformSchema(dataset.schema, logging = true)
>
>     dataset.withColumn($(outputCol),
>
>       callUDF(this.createTransformFunc, outputDataType, dataset($(inputCol
> ))))
>
>   }
>
>
>
> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala
>
>
> class Tokenizer(override val uid: String)
>
>   extends UnaryTransformer[String, Seq[String], Tokenizer] with
> DefaultParamsWritable {
>
>
> . . .
>
>   override protected def createTransformFunc: String => Seq[String] = {
>
>     _.toLowerCase.split("\\s")
>
>   }
>
> . . .
>
> }
>
>
> Kind regards
>
>
> Andy
>
>
>

Reply via email to