Repository: spark
Updated Branches:
  refs/heads/master 9fa703e89 -> d5861aba9


[SPARK-22945][SQL] add java UDF APIs in the functions object

## What changes were proposed in this pull request?

Currently Scala users can use UDF like
```
val foo = udf((i: Int) => Math.random() + i).asNondeterministic
df.select(foo('a))
```
Python users can also do it with similar APIs. However Java users can't do it, 
we should add Java UDF APIs in the functions object.

## How was this patch tested?

new tests

Author: Wenchen Fan <wenc...@databricks.com>

Closes #20141 from cloud-fan/udf.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5861aba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5861aba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5861aba

Branch: refs/heads/master
Commit: d5861aba9d80ca15ad3f22793b79822e470d6913
Parents: 9fa703e
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Thu Jan 4 19:17:22 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Jan 4 19:17:22 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/UDFRegistration.scala  |  90 +++---
 .../sql/expressions/UserDefinedFunction.scala   |   1 +
 .../scala/org/apache/spark/sql/functions.scala  | 313 +++++++++++++++----
 .../apache/spark/sql/JavaDataFrameSuite.java    |  11 +
 .../scala/org/apache/spark/sql/UDFSuite.scala   |  12 +-
 5 files changed, 315 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d5861aba/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index dc2468a..f94baef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import java.lang.reflect.{ParameterizedType, Type}
+import java.lang.reflect.ParameterizedType
 
 import scala.reflect.runtime.universe.TypeTag
 import scala.util.Try
@@ -110,29 +110,29 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
 
   /* register 0-22 were generated by this script
 
-    (0 to 22).map { x =>
+    (0 to 22).foreach { x =>
       val types = (1 to x).foldRight("RT")((i, s) => {s"A$i, $s"})
-      val typeTags = (1 to x).map(i => s"A${i}: TypeTag").foldLeft("RT: 
TypeTag")(_ + ", " + _)
+      val typeTags = (1 to x).map(i => s"A$i: TypeTag").foldLeft("RT: 
TypeTag")(_ + ", " + _)
       val inputTypes = (1 to x).foldRight("Nil")((i, s) => 
{s"ScalaReflection.schemaFor[A$i].dataType :: $s"})
       println(s"""
-        /**
-         * Registers a deterministic Scala closure of ${x} arguments as 
user-defined function (UDF).
-         * @tparam RT return type of UDF.
-         * @since 1.3.0
-         */
-        def register[$typeTags](name: String, func: Function$x[$types]): 
UserDefinedFunction = {
-          val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
-          val inputTypes = Try($inputTypes).toOption
-          def builder(e: Seq[Expression]) = if (e.length == $x) {
-            ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), 
nullable, udfDeterministic = true)
-          } else {
-             throw new AnalysisException("Invalid number of arguments for 
function " + name +
-               ". Expected: $x; Found: " + e.length)
-          }
-          functionRegistry.createOrReplaceTempFunction(name, builder)
-          val udf = UserDefinedFunction(func, dataType, 
inputTypes).withName(name)
-          if (nullable) udf else udf.asNonNullable()
-        }""")
+        |/**
+        | * Registers a deterministic Scala closure of $x arguments as 
user-defined function (UDF).
+        | * @tparam RT return type of UDF.
+        | * @since 1.3.0
+        | */
+        |def register[$typeTags](name: String, func: Function$x[$types]): 
UserDefinedFunction = {
+        |  val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
+        |  val inputTypes = Try($inputTypes).toOption
+        |  def builder(e: Seq[Expression]) = if (e.length == $x) {
+        |    ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), 
Some(name), nullable, udfDeterministic = true)
+        |  } else {
+        |    throw new AnalysisException("Invalid number of arguments for 
function " + name +
+        |      ". Expected: $x; Found: " + e.length)
+        |  }
+        |  functionRegistry.createOrReplaceTempFunction(name, builder)
+        |  val udf = UserDefinedFunction(func, dataType, 
inputTypes).withName(name)
+        |  if (nullable) udf else udf.asNonNullable()
+        |}""".stripMargin)
     }
 
     (0 to 22).foreach { i =>
@@ -144,7 +144,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
       val funcCall = if (i == 0) "() => func" else "func"
       println(s"""
         |/**
-        | * Register a user-defined function with ${i} arguments.
+        | * Register a deterministic Java UDF$i instance as user-defined 
function (UDF).
         | * @since $version
         | */
         |def register(name: String, f: UDF$i[$extTypeArgs], returnType: 
DataType): Unit = {
@@ -689,7 +689,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 0 arguments.
+   * Register a deterministic Java UDF0 instance as user-defined function 
(UDF).
    * @since 2.3.0
    */
   def register(name: String, f: UDF0[_], returnType: DataType): Unit = {
@@ -704,7 +704,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 1 arguments.
+   * Register a deterministic Java UDF1 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF1[_, _], returnType: DataType): Unit = {
@@ -719,7 +719,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 2 arguments.
+   * Register a deterministic Java UDF2 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF2[_, _, _], returnType: DataType): Unit = {
@@ -734,7 +734,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 3 arguments.
+   * Register a deterministic Java UDF3 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF3[_, _, _, _], returnType: DataType): Unit 
= {
@@ -749,7 +749,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 4 arguments.
+   * Register a deterministic Java UDF4 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF4[_, _, _, _, _], returnType: DataType): 
Unit = {
@@ -764,7 +764,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 5 arguments.
+   * Register a deterministic Java UDF5 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF5[_, _, _, _, _, _], returnType: DataType): 
Unit = {
@@ -779,7 +779,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 6 arguments.
+   * Register a deterministic Java UDF6 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF6[_, _, _, _, _, _, _], returnType: 
DataType): Unit = {
@@ -794,7 +794,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 7 arguments.
+   * Register a deterministic Java UDF7 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF7[_, _, _, _, _, _, _, _], returnType: 
DataType): Unit = {
@@ -809,7 +809,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 8 arguments.
+   * Register a deterministic Java UDF8 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF8[_, _, _, _, _, _, _, _, _], returnType: 
DataType): Unit = {
@@ -824,7 +824,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 9 arguments.
+   * Register a deterministic Java UDF9 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF9[_, _, _, _, _, _, _, _, _, _], 
returnType: DataType): Unit = {
@@ -839,7 +839,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 10 arguments.
+   * Register a deterministic Java UDF10 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF10[_, _, _, _, _, _, _, _, _, _, _], 
returnType: DataType): Unit = {
@@ -854,7 +854,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 11 arguments.
+   * Register a deterministic Java UDF11 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF11[_, _, _, _, _, _, _, _, _, _, _, _], 
returnType: DataType): Unit = {
@@ -869,7 +869,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 12 arguments.
+   * Register a deterministic Java UDF12 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF12[_, _, _, _, _, _, _, _, _, _, _, _, _], 
returnType: DataType): Unit = {
@@ -884,7 +884,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 13 arguments.
+   * Register a deterministic Java UDF13 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, 
_], returnType: DataType): Unit = {
@@ -899,7 +899,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 14 arguments.
+   * Register a deterministic Java UDF14 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF14[_, _, _, _, _, _, _, _, _, _, _, _, _, 
_, _], returnType: DataType): Unit = {
@@ -914,7 +914,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 15 arguments.
+   * Register a deterministic Java UDF15 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF15[_, _, _, _, _, _, _, _, _, _, _, _, _, 
_, _, _], returnType: DataType): Unit = {
@@ -929,7 +929,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 16 arguments.
+   * Register a deterministic Java UDF16 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF16[_, _, _, _, _, _, _, _, _, _, _, _, _, 
_, _, _, _], returnType: DataType): Unit = {
@@ -944,7 +944,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 17 arguments.
+   * Register a deterministic Java UDF17 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF17[_, _, _, _, _, _, _, _, _, _, _, _, _, 
_, _, _, _, _], returnType: DataType): Unit = {
@@ -959,7 +959,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 18 arguments.
+   * Register a deterministic Java UDF18 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF18[_, _, _, _, _, _, _, _, _, _, _, _, _, 
_, _, _, _, _, _], returnType: DataType): Unit = {
@@ -974,7 +974,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 19 arguments.
+   * Register a deterministic Java UDF19 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF19[_, _, _, _, _, _, _, _, _, _, _, _, _, 
_, _, _, _, _, _, _], returnType: DataType): Unit = {
@@ -989,7 +989,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 20 arguments.
+   * Register a deterministic Java UDF20 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF20[_, _, _, _, _, _, _, _, _, _, _, _, _, 
_, _, _, _, _, _, _, _], returnType: DataType): Unit = {
@@ -1004,7 +1004,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 21 arguments.
+   * Register a deterministic Java UDF21 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF21[_, _, _, _, _, _, _, _, _, _, _, _, _, 
_, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
@@ -1019,7 +1019,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   }
 
   /**
-   * Register a user-defined function with 22 arguments.
+   * Register a deterministic Java UDF22 instance as user-defined function 
(UDF).
    * @since 1.3.0
    */
   def register(name: String, f: UDF22[_, _, _, _, _, _, _, _, _, _, _, _, _, 
_, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/d5861aba/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
index 03b654f..40a058d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
@@ -66,6 +66,7 @@ case class UserDefinedFunction protected[sql] (
    *
    * @since 1.3.0
    */
+  @scala.annotation.varargs
   def apply(exprs: Column*): Column = {
     Column(ScalaUDF(
       f,

http://git-wip-us.apache.org/repos/asf/spark/blob/d5861aba/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 530a525..0d11682 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -24,6 +24,7 @@ import scala.util.Try
 import scala.util.control.NonFatal
 
 import org.apache.spark.annotation.InterfaceStability
+import org.apache.spark.sql.api.java._
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedFunction}
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -32,7 +33,6 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, ResolvedHint}
 import org.apache.spark.sql.execution.SparkSqlParser
 import org.apache.spark.sql.expressions.UserDefinedFunction
-import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -3254,42 +3254,66 @@ object functions {
    */
   def map_values(e: Column): Column = withExpr { MapValues(e.expr) }
 
-  
//////////////////////////////////////////////////////////////////////////////////////////////
-  
//////////////////////////////////////////////////////////////////////////////////////////////
-
   // scalastyle:off line.size.limit
   // scalastyle:off parameter.number
 
   /* Use the following code to generate:
-  (0 to 10).map { x =>
+
+  (0 to 10).foreach { x =>
     val types = (1 to x).foldRight("RT")((i, s) => {s"A$i, $s"})
     val typeTags = (1 to x).map(i => s"A$i: TypeTag").foldLeft("RT: 
TypeTag")(_ + ", " + _)
     val inputTypes = (1 to x).foldRight("Nil")((i, s) => 
{s"ScalaReflection.schemaFor(typeTag[A$i]).dataType :: $s"})
     println(s"""
-    /**
-     * Defines a deterministic user-defined function of ${x} arguments as 
user-defined
-     * function (UDF). The data types are automatically inferred based on the 
function's
-     * signature. To change a UDF to nondeterministic, call the API
-     * `UserDefinedFunction.asNondeterministic()`.
-     *
-     * @group udf_funcs
-     * @since 1.3.0
-     */
-    def udf[$typeTags](f: Function$x[$types]): UserDefinedFunction = {
-      val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
-      val inputTypes = Try($inputTypes).toOption
-      val udf = UserDefinedFunction(f, dataType, inputTypes)
-      if (nullable) udf else udf.asNonNullable()
-    }""")
+      |/**
+      | * Defines a Scala closure of $x arguments as user-defined function 
(UDF).
+      | * The data types are automatically inferred based on the Scala 
closure's
+      | * signature. By default the returned UDF is deterministic. To change 
it to
+      | * nondeterministic, call the API 
`UserDefinedFunction.asNondeterministic()`.
+      | *
+      | * @group udf_funcs
+      | * @since 1.3.0
+      | */
+      |def udf[$typeTags](f: Function$x[$types]): UserDefinedFunction = {
+      |  val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
+      |  val inputTypes = Try($inputTypes).toOption
+      |  val udf = UserDefinedFunction(f, dataType, inputTypes)
+      |  if (nullable) udf else udf.asNonNullable()
+      |}""".stripMargin)
+  }
+
+  (0 to 10).foreach { i =>
+    val extTypeArgs = (0 to i).map(_ => "_").mkString(", ")
+    val anyTypeArgs = (0 to i).map(_ => "Any").mkString(", ")
+    val anyCast = s".asInstanceOf[UDF$i[$anyTypeArgs]]"
+    val anyParams = (1 to i).map(_ => "_: Any").mkString(", ")
+    val funcCall = if (i == 0) "() => func" else "func"
+    println(s"""
+      |/**
+      | * Defines a Java UDF$i instance as user-defined function (UDF).
+      | * The caller must specify the output data type, and there is no 
automatic input type coercion.
+      | * By default the returned UDF is deterministic. To change it to 
nondeterministic, call the
+      | * API `UserDefinedFunction.asNondeterministic()`.
+      | *
+      | * @group udf_funcs
+      | * @since 2.3.0
+      | */
+      |def udf(f: UDF$i[$extTypeArgs], returnType: DataType): 
UserDefinedFunction = {
+      |  val func = f$anyCast.call($anyParams)
+      |  UserDefinedFunction($funcCall, returnType, inputTypes = None)
+      |}""".stripMargin)
   }
 
   */
 
+  
//////////////////////////////////////////////////////////////////////////////////////////////
+  // Scala UDF functions
+  
//////////////////////////////////////////////////////////////////////////////////////////////
+
   /**
-   * Defines a deterministic user-defined function of 0 arguments as 
user-defined
-   * function (UDF). The data types are automatically inferred based on the 
function's
-   * signature. To change a UDF to nondeterministic, call the API
-   * `UserDefinedFunction.asNondeterministic()`.
+   * Defines a Scala closure of 0 arguments as user-defined function (UDF).
+   * The data types are automatically inferred based on the Scala closure's
+   * signature. By default the returned UDF is deterministic. To change it to
+   * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`.
    *
    * @group udf_funcs
    * @since 1.3.0
@@ -3302,10 +3326,10 @@ object functions {
   }
 
   /**
-   * Defines a deterministic user-defined function of 1 arguments as 
user-defined
-   * function (UDF). The data types are automatically inferred based on the 
function's
-   * signature. To change a UDF to nondeterministic, call the API
-   * `UserDefinedFunction.asNondeterministic()`.
+   * Defines a Scala closure of 1 arguments as user-defined function (UDF).
+   * The data types are automatically inferred based on the Scala closure's
+   * signature. By default the returned UDF is deterministic. To change it to
+   * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`.
    *
    * @group udf_funcs
    * @since 1.3.0
@@ -3318,10 +3342,10 @@ object functions {
   }
 
   /**
-   * Defines a deterministic user-defined function of 2 arguments as 
user-defined
-   * function (UDF). The data types are automatically inferred based on the 
function's
-   * signature. To change a UDF to nondeterministic, call the API
-   * `UserDefinedFunction.asNondeterministic()`.
+   * Defines a Scala closure of 2 arguments as user-defined function (UDF).
+   * The data types are automatically inferred based on the Scala closure's
+   * signature. By default the returned UDF is deterministic. To change it to
+   * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`.
    *
    * @group udf_funcs
    * @since 1.3.0
@@ -3334,10 +3358,10 @@ object functions {
   }
 
   /**
-   * Defines a deterministic user-defined function of 3 arguments as 
user-defined
-   * function (UDF). The data types are automatically inferred based on the 
function's
-   * signature. To change a UDF to nondeterministic, call the API
-   * `UserDefinedFunction.asNondeterministic()`.
+   * Defines a Scala closure of 3 arguments as user-defined function (UDF).
+   * The data types are automatically inferred based on the Scala closure's
+   * signature. By default the returned UDF is deterministic. To change it to
+   * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`.
    *
    * @group udf_funcs
    * @since 1.3.0
@@ -3350,10 +3374,10 @@ object functions {
   }
 
   /**
-   * Defines a deterministic user-defined function of 4 arguments as 
user-defined
-   * function (UDF). The data types are automatically inferred based on the 
function's
-   * signature. To change a UDF to nondeterministic, call the API
-   * `UserDefinedFunction.asNondeterministic()`.
+   * Defines a Scala closure of 4 arguments as user-defined function (UDF).
+   * The data types are automatically inferred based on the Scala closure's
+   * signature. By default the returned UDF is deterministic. To change it to
+   * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`.
    *
    * @group udf_funcs
    * @since 1.3.0
@@ -3366,10 +3390,10 @@ object functions {
   }
 
   /**
-   * Defines a deterministic user-defined function of 5 arguments as 
user-defined
-   * function (UDF). The data types are automatically inferred based on the 
function's
-   * signature. To change a UDF to nondeterministic, call the API
-   * `UserDefinedFunction.asNondeterministic()`.
+   * Defines a Scala closure of 5 arguments as user-defined function (UDF).
+   * The data types are automatically inferred based on the Scala closure's
+   * signature. By default the returned UDF is deterministic. To change it to
+   * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`.
    *
    * @group udf_funcs
    * @since 1.3.0
@@ -3382,10 +3406,10 @@ object functions {
   }
 
   /**
-   * Defines a deterministic user-defined function of 6 arguments as 
user-defined
-   * function (UDF). The data types are automatically inferred based on the 
function's
-   * signature. To change a UDF to nondeterministic, call the API
-   * `UserDefinedFunction.asNondeterministic()`.
+   * Defines a Scala closure of 6 arguments as user-defined function (UDF).
+   * The data types are automatically inferred based on the Scala closure's
+   * signature. By default the returned UDF is deterministic. To change it to
+   * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`.
    *
    * @group udf_funcs
    * @since 1.3.0
@@ -3398,10 +3422,10 @@ object functions {
   }
 
   /**
-   * Defines a deterministic user-defined function of 7 arguments as 
user-defined
-   * function (UDF). The data types are automatically inferred based on the 
function's
-   * signature. To change a UDF to nondeterministic, call the API
-   * `UserDefinedFunction.asNondeterministic()`.
+   * Defines a Scala closure of 7 arguments as user-defined function (UDF).
+   * The data types are automatically inferred based on the Scala closure's
+   * signature. By default the returned UDF is deterministic. To change it to
+   * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`.
    *
    * @group udf_funcs
    * @since 1.3.0
@@ -3414,10 +3438,10 @@ object functions {
   }
 
   /**
-   * Defines a deterministic user-defined function of 8 arguments as 
user-defined
-   * function (UDF). The data types are automatically inferred based on the 
function's
-   * signature. To change a UDF to nondeterministic, call the API
-   * `UserDefinedFunction.asNondeterministic()`.
+   * Defines a Scala closure of 8 arguments as user-defined function (UDF).
+   * The data types are automatically inferred based on the Scala closure's
+   * signature. By default the returned UDF is deterministic. To change it to
+   * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`.
    *
    * @group udf_funcs
    * @since 1.3.0
@@ -3430,10 +3454,10 @@ object functions {
   }
 
   /**
-   * Defines a deterministic user-defined function of 9 arguments as 
user-defined
-   * function (UDF). The data types are automatically inferred based on the 
function's
-   * signature. To change a UDF to nondeterministic, call the API
-   * `UserDefinedFunction.asNondeterministic()`.
+   * Defines a Scala closure of 9 arguments as user-defined function (UDF).
+   * The data types are automatically inferred based on the Scala closure's
+   * signature. By default the returned UDF is deterministic. To change it to
+   * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`.
    *
    * @group udf_funcs
    * @since 1.3.0
@@ -3446,10 +3470,10 @@ object functions {
   }
 
   /**
-   * Defines a deterministic user-defined function of 10 arguments as 
user-defined
-   * function (UDF). The data types are automatically inferred based on the 
function's
-   * signature. To change a UDF to nondeterministic, call the API
-   * `UserDefinedFunction.asNondeterministic()`.
+   * Defines a Scala closure of 10 arguments as user-defined function (UDF).
+   * The data types are automatically inferred based on the Scala closure's
+   * signature. By default the returned UDF is deterministic. To change it to
+   * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`.
    *
    * @group udf_funcs
    * @since 1.3.0
@@ -3461,13 +3485,172 @@ object functions {
     if (nullable) udf else udf.asNonNullable()
   }
 
+  
//////////////////////////////////////////////////////////////////////////////////////////////
+  // Java UDF functions
+  
//////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Defines a Java UDF0 instance as user-defined function (UDF).
+   * The caller must specify the output data type, and there is no automatic 
input type coercion.
+   * By default the returned UDF is deterministic. To change it to 
nondeterministic, call the
+   * API `UserDefinedFunction.asNondeterministic()`.
+   *
+   * @group udf_funcs
+   * @since 2.3.0
+   */
+  def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = {
+    val func = f.asInstanceOf[UDF0[Any]].call()
+    UserDefinedFunction(() => func, returnType, inputTypes = None)
+  }
+
+  /**
+   * Defines a Java UDF1 instance as user-defined function (UDF).
+   * The caller must specify the output data type, and there is no automatic 
input type coercion.
+   * By default the returned UDF is deterministic. To change it to 
nondeterministic, call the
+   * API `UserDefinedFunction.asNondeterministic()`.
+   *
+   * @group udf_funcs
+   * @since 2.3.0
+   */
+  def udf(f: UDF1[_, _], returnType: DataType): UserDefinedFunction = {
+    val func = f.asInstanceOf[UDF1[Any, Any]].call(_: Any)
+    UserDefinedFunction(func, returnType, inputTypes = None)
+  }
+
+  /**
+   * Defines a Java UDF2 instance as user-defined function (UDF).
+   * The caller must specify the output data type, and there is no automatic 
input type coercion.
+   * By default the returned UDF is deterministic. To change it to 
nondeterministic, call the
+   * API `UserDefinedFunction.asNondeterministic()`.
+   *
+   * @group udf_funcs
+   * @since 2.3.0
+   */
+  def udf(f: UDF2[_, _, _], returnType: DataType): UserDefinedFunction = {
+    val func = f.asInstanceOf[UDF2[Any, Any, Any]].call(_: Any, _: Any)
+    UserDefinedFunction(func, returnType, inputTypes = None)
+  }
+
+  /**
+   * Defines a Java UDF3 instance as user-defined function (UDF).
+   * The caller must specify the output data type, and there is no automatic 
input type coercion.
+   * By default the returned UDF is deterministic. To change it to 
nondeterministic, call the
+   * API `UserDefinedFunction.asNondeterministic()`.
+   *
+   * @group udf_funcs
+   * @since 2.3.0
+   */
+  def udf(f: UDF3[_, _, _, _], returnType: DataType): UserDefinedFunction = {
+    val func = f.asInstanceOf[UDF3[Any, Any, Any, Any]].call(_: Any, _: Any, 
_: Any)
+    UserDefinedFunction(func, returnType, inputTypes = None)
+  }
+
+  /**
+   * Defines a Java UDF4 instance as user-defined function (UDF).
+   * The caller must specify the output data type, and there is no automatic 
input type coercion.
+   * By default the returned UDF is deterministic. To change it to 
nondeterministic, call the
+   * API `UserDefinedFunction.asNondeterministic()`.
+   *
+   * @group udf_funcs
+   * @since 2.3.0
+   */
+  def udf(f: UDF4[_, _, _, _, _], returnType: DataType): UserDefinedFunction = 
{
+    val func = f.asInstanceOf[UDF4[Any, Any, Any, Any, Any]].call(_: Any, _: 
Any, _: Any, _: Any)
+    UserDefinedFunction(func, returnType, inputTypes = None)
+  }
+
+  /**
+   * Defines a Java UDF5 instance as user-defined function (UDF).
+   * The caller must specify the output data type, and there is no automatic 
input type coercion.
+   * By default the returned UDF is deterministic. To change it to 
nondeterministic, call the
+   * API `UserDefinedFunction.asNondeterministic()`.
+   *
+   * @group udf_funcs
+   * @since 2.3.0
+   */
+  def udf(f: UDF5[_, _, _, _, _, _], returnType: DataType): 
UserDefinedFunction = {
+    val func = f.asInstanceOf[UDF5[Any, Any, Any, Any, Any, Any]].call(_: Any, 
_: Any, _: Any, _: Any, _: Any)
+    UserDefinedFunction(func, returnType, inputTypes = None)
+  }
+
+  /**
+   * Defines a Java UDF6 instance as user-defined function (UDF).
+   * The caller must specify the output data type, and there is no automatic 
input type coercion.
+   * By default the returned UDF is deterministic. To change it to 
nondeterministic, call the
+   * API `UserDefinedFunction.asNondeterministic()`.
+   *
+   * @group udf_funcs
+   * @since 2.3.0
+   */
+  def udf(f: UDF6[_, _, _, _, _, _, _], returnType: DataType): 
UserDefinedFunction = {
+    val func = f.asInstanceOf[UDF6[Any, Any, Any, Any, Any, Any, Any]].call(_: 
Any, _: Any, _: Any, _: Any, _: Any, _: Any)
+    UserDefinedFunction(func, returnType, inputTypes = None)
+  }
+
+  /**
+   * Defines a Java UDF7 instance as user-defined function (UDF).
+   * The caller must specify the output data type, and there is no automatic 
input type coercion.
+   * By default the returned UDF is deterministic. To change it to 
nondeterministic, call the
+   * API `UserDefinedFunction.asNondeterministic()`.
+   *
+   * @group udf_funcs
+   * @since 2.3.0
+   */
+  def udf(f: UDF7[_, _, _, _, _, _, _, _], returnType: DataType): 
UserDefinedFunction = {
+    val func = f.asInstanceOf[UDF7[Any, Any, Any, Any, Any, Any, Any, 
Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
+    UserDefinedFunction(func, returnType, inputTypes = None)
+  }
+
+  /**
+   * Defines a Java UDF8 instance as user-defined function (UDF).
+   * The caller must specify the output data type, and there is no automatic 
input type coercion.
+   * By default the returned UDF is deterministic. To change it to 
nondeterministic, call the
+   * API `UserDefinedFunction.asNondeterministic()`.
+   *
+   * @group udf_funcs
+   * @since 2.3.0
+   */
+  def udf(f: UDF8[_, _, _, _, _, _, _, _, _], returnType: DataType): 
UserDefinedFunction = {
+    val func = f.asInstanceOf[UDF8[Any, Any, Any, Any, Any, Any, Any, Any, 
Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
+    UserDefinedFunction(func, returnType, inputTypes = None)
+  }
+
+  /**
+   * Defines a Java UDF9 instance as user-defined function (UDF).
+   * The caller must specify the output data type, and there is no automatic 
input type coercion.
+   * By default the returned UDF is deterministic. To change it to 
nondeterministic, call the
+   * API `UserDefinedFunction.asNondeterministic()`.
+   *
+   * @group udf_funcs
+   * @since 2.3.0
+   */
+  def udf(f: UDF9[_, _, _, _, _, _, _, _, _, _], returnType: DataType): 
UserDefinedFunction = {
+    val func = f.asInstanceOf[UDF9[Any, Any, Any, Any, Any, Any, Any, Any, 
Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, 
_: Any)
+    UserDefinedFunction(func, returnType, inputTypes = None)
+  }
+
+  /**
+   * Defines a Java UDF10 instance as user-defined function (UDF).
+   * The caller must specify the output data type, and there is no automatic 
input type coercion.
+   * By default the returned UDF is deterministic. To change it to 
nondeterministic, call the
+   * API `UserDefinedFunction.asNondeterministic()`.
+   *
+   * @group udf_funcs
+   * @since 2.3.0
+   */
+  def udf(f: UDF10[_, _, _, _, _, _, _, _, _, _, _], returnType: DataType): 
UserDefinedFunction = {
+    val func = f.asInstanceOf[UDF10[Any, Any, Any, Any, Any, Any, Any, Any, 
Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: 
Any, _: Any, _: Any)
+    UserDefinedFunction(func, returnType, inputTypes = None)
+  }
+
   // scalastyle:on parameter.number
   // scalastyle:on line.size.limit
 
   /**
    * Defines a deterministic user-defined function (UDF) using a Scala 
closure. For this variant,
    * the caller must specify the output data type, and there is no automatic 
input type coercion.
-   * To change a UDF to nondeterministic, call the API 
`UserDefinedFunction.asNondeterministic()`.
+   * By default the returned UDF is deterministic. To change it to 
nondeterministic, call the
+   * API `UserDefinedFunction.asNondeterministic()`.
    *
    * @param f  A closure in Scala
    * @param dataType  The output data type of the UDF

http://git-wip-us.apache.org/repos/asf/spark/blob/d5861aba/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java 
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index b007093..4f8a31f 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -36,6 +36,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.expressions.UserDefinedFunction;
 import org.apache.spark.sql.test.TestSparkSession;
 import org.apache.spark.sql.types.*;
 import org.apache.spark.util.sketch.BloomFilter;
@@ -455,4 +456,14 @@ public class JavaDataFrameSuite {
     CircularReference1Bean bean = new CircularReference1Bean();
     spark.createDataFrame(Arrays.asList(bean), CircularReference1Bean.class);
   }
+
+  @Test
+  public void testUDF() {
+    UserDefinedFunction foo = udf((Integer i, String s) -> i.toString() + s, 
DataTypes.StringType);
+    Dataset<Row> df = spark.table("testData").select(foo.apply(col("key"), 
col("value")));
+    String[] result = df.collectAsList().stream().map(row -> 
row.getString(0)).toArray(String[]::new);
+    String[] expected = spark.table("testData").collectAsList().stream()
+      .map(row -> row.get(0).toString() + 
row.getString(1)).toArray(String[]::new);
+    Assert.assertArrayEquals(expected, result);
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d5861aba/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index 7f1c009..db37be6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.api.java._
 import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.execution.command.ExplainCommand
-import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.functions.udf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.test.SQLTestData._
-import org.apache.spark.sql.types.DataTypes
+import org.apache.spark.sql.types.{DataTypes, DoubleType}
 
 private case class FunctionResult(f1: String, f2: String)
 
@@ -128,6 +129,13 @@ class UDFSuite extends QueryTest with SharedSQLContext {
     val df2 = testData.select(bar())
     
assert(df2.logicalPlan.asInstanceOf[Project].projectList.forall(!_.deterministic))
     assert(df2.head().getDouble(0) >= 0.0)
+
+    val javaUdf = udf(new UDF0[Double] {
+      override def call(): Double = Math.random()
+    }, DoubleType).asNondeterministic()
+    val df3 = testData.select(javaUdf())
+    
assert(df3.logicalPlan.asInstanceOf[Project].projectList.forall(!_.deterministic))
+    assert(df3.head().getDouble(0) >= 0.0)
   }
 
   test("TwoArgument UDF") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to