Repository: spark
Updated Branches:
  refs/heads/master a819dab66 -> 0ef16bd4b


[SPARK-20668][SQL] Modify ScalaUDF to handle nullability.

## What changes were proposed in this pull request?

When registering Scala UDF, we can know if the udf will return nullable value 
or not. `ScalaUDF` and related classes should handle the nullability.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ues...@databricks.com>

Closes #17911 from ueshin/issues/SPARK-20668.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ef16bd4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ef16bd4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ef16bd4

Branch: refs/heads/master
Commit: 0ef16bd4b00ba5d68c9767d9401d12566f5637c1
Parents: a819dab
Author: Takuya UESHIN <ues...@databricks.com>
Authored: Tue May 9 23:48:25 2017 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Tue May 9 23:48:25 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |   2 +-
 .../sql/catalyst/expressions/ScalaUDF.scala     |   6 +-
 .../org/apache/spark/sql/UDFRegistration.scala  | 144 +++++++++----------
 .../sql/expressions/UserDefinedFunction.scala   |  37 ++++-
 .../scala/org/apache/spark/sql/functions.scala  |  37 +++--
 5 files changed, 137 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0ef16bd4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index c56dd36..c5c2a5b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2128,7 +2128,7 @@ class Analyzer(
 
       case p => p transformExpressionsUp {
 
-        case udf @ ScalaUDF(func, _, inputs, _, _) =>
+        case udf @ ScalaUDF(func, _, inputs, _, _, _) =>
           val parameterTypes = ScalaReflection.getParameterTypes(func)
           assert(parameterTypes.length == inputs.length)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0ef16bd4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
index 228f4b7..af1eba2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
@@ -36,17 +36,17 @@ import org.apache.spark.sql.types.DataType
  *                    better to use Option of Seq[DataType] so we can use 
"None" as the case for no
  *                    type coercion. However, that would require more 
refactoring of the codebase.
  * @param udfName   The user-specified name of this UDF.
+ * @param nullable  True if the UDF can return null value.
  */
 case class ScalaUDF(
     function: AnyRef,
     dataType: DataType,
     children: Seq[Expression],
     inputTypes: Seq[DataType] = Nil,
-    udfName: Option[String] = None)
+    udfName: Option[String] = None,
+    nullable: Boolean = true)
   extends Expression with ImplicitCastInputTypes with NonSQLExpression {
 
-  override def nullable: Boolean = true
-
   override def toString: String =
     s"${udfName.map(name => 
s"UDF:$name").getOrElse("UDF")}(${children.mkString(", ")})"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0ef16bd4/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index 6accf1f..5fd7123 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -110,11 +110,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
          * @since 1.3.0
          */
         def register[$typeTags](name: String, func: Function$x[$types]): 
UserDefinedFunction = {
-          val dataType = ScalaReflection.schemaFor[RT].dataType
+          val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
           val inputTypes = Try($inputTypes).toOption
-          def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+          def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
           functionRegistry.registerFunction(name, builder)
-          UserDefinedFunction(func, dataType, inputTypes)
+          UserDefinedFunction(func, dataType, 
inputTypes).withNullability(nullable)
         }""")
     }
 
@@ -143,11 +143,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag](name: String, func: Function0[RT]): 
UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -156,11 +156,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag](name: String, func: Function1[A1, 
RT]): UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -169,11 +169,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag](name: String, func: 
Function2[A1, A2, RT]): UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -182,11 +182,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](name: 
String, func: Function3[A1, A2, A3, RT]): UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -195,11 +195,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: 
TypeTag](name: String, func: Function4[A1, A2, A3, A4, RT]): 
UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: 
ScalaReflection.schemaFor[A4].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -208,11 +208,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: 
TypeTag, A5: TypeTag](name: String, func: Function5[A1, A2, A3, A4, A5, RT]): 
UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: 
ScalaReflection.schemaFor[A4].dataType :: 
ScalaReflection.schemaFor[A5].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -221,11 +221,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: 
TypeTag, A5: TypeTag, A6: TypeTag](name: String, func: Function6[A1, A2, A3, 
A4, A5, A6, RT]): UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: 
ScalaReflection.schemaFor[A4].dataType :: 
ScalaReflection.schemaFor[A5].dataType :: 
ScalaReflection.schemaFor[A6].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -234,11 +234,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: 
TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag](name: String, func: 
Function7[A1, A2, A3, A4, A5, A6, A7, RT]): UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: 
ScalaReflection.schemaFor[A4].dataType :: 
ScalaReflection.schemaFor[A5].dataType :: 
ScalaReflection.schemaFor[A6].dataType :: 
ScalaReflection.schemaFor[A7].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -247,11 +247,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: 
TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag](name: String, 
func: Function8[A1, A2, A3, A4, A5, A6, A7, A8, RT]): UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: 
ScalaReflection.schemaFor[A4].dataType :: 
ScalaReflection.schemaFor[A5].dataType :: 
ScalaReflection.schemaFor[A6].dataType :: 
ScalaReflection.schemaFor[A7].dataType :: 
ScalaReflection.schemaFor[A8].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -260,11 +260,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: 
TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag](name: 
String, func: Function9[A1, A2, A3, A4, A5, A6, A7, A8, A9, RT]): 
UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: 
ScalaReflection.schemaFor[A4].dataType :: 
ScalaReflection.schemaFor[A5].dataType :: 
ScalaReflection.schemaFor[A6].dataType :: 
ScalaReflection.schemaFor[A7].dataType :: 
ScalaReflection.schemaFor[A8].dataType :: 
ScalaReflection.schemaFor[A9].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -273,11 +273,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: 
TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: 
TypeTag](name: String, func: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, 
A10, RT]): UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: 
ScalaReflection.schemaFor[A4].dataType :: 
ScalaReflection.schemaFor[A5].dataType :: 
ScalaReflection.schemaFor[A6].dataType :: 
ScalaReflection.schemaFor[A7].dataType :: 
ScalaReflection.schemaFor[A8].dataType :: 
ScalaReflection.schemaFor[A9].dataType :: 
ScalaReflection.schemaFor[A10].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -286,11 +286,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: 
TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: 
TypeTag, A11: TypeTag](name: String, func: Function11[A1, A2, A3, A4, A5, A6, 
A7, A8, A9, A10, A11, RT]): UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: 
ScalaReflection.schemaFor[A4].dataType :: 
ScalaReflection.schemaFor[A5].dataType :: 
ScalaReflection.schemaFor[A6].dataType :: 
ScalaReflection.schemaFor[A7].dataType :: 
ScalaReflection.schemaFor[A8].dataType :: 
ScalaReflection.schemaFor[A9].dataType :: 
ScalaReflection.schemaFor[A10].dataType :: 
ScalaReflection.schemaFor[A11].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -299,11 +299,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: 
TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: 
TypeTag, A11: TypeTag, A12: TypeTag](name: String, func: Function12[A1, A2, A3, 
A4, A5, A6, A7, A8, A9, A10, A11, A12, RT]): UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: 
ScalaReflection.schemaFor[A4].dataType :: 
ScalaReflection.schemaFor[A5].dataType :: 
ScalaReflection.schemaFor[A6].dataType :: 
ScalaReflection.schemaFor[A7].dataType :: 
ScalaReflection.schemaFor[A8].dataType :: 
ScalaReflection.schemaFor[A9].dataType :: 
ScalaReflection.schemaFor[A10].dataType :: 
ScalaReflection.schemaFor[A11].dataType :: 
ScalaReflection.schemaFor[A12].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -312,11 +312,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: 
TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: 
TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag](name: String, func: 
Function13[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, RT]): 
UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: 
ScalaReflection.schemaFor[A4].dataType :: 
ScalaReflection.schemaFor[A5].dataType :: 
ScalaReflection.schemaFor[A6].dataType :: 
ScalaReflection.schemaFor[A7].dataType :: 
ScalaReflection.schemaFor[A8].dataType :: 
ScalaReflection.schemaFor[A9].dataType :: 
ScalaReflection.schemaFor[A10].dataType :: 
ScalaReflection.schemaFor[A11].dataType :: 
ScalaReflection.schemaFor[A12].dataType :: 
ScalaReflection.schemaFor[A13].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -325,11 +325,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: 
TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: 
TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag](name: String, 
func: Function14[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, 
RT]): UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: 
ScalaReflection.schemaFor[A4].dataType :: 
ScalaReflection.schemaFor[A5].dataType :: 
ScalaReflection.schemaFor[A6].dataType :: 
ScalaReflection.schemaFor[A7].dataType :: 
ScalaReflection.schemaFor[A8].dataType :: 
ScalaReflection.schemaFor[A9].dataType :: 
ScalaReflection.schemaFor[A10].dataType :: 
ScalaReflection.schemaFor[A11].dataType :: 
ScalaReflection.schemaFor[A12].dataType :: 
ScalaReflection.schemaFor[A13].dataType :: 
ScalaReflection.schemaFor[A14].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -338,11 +338,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: 
TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: 
TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: 
TypeTag](name: String, func: Function15[A1, A2, A3, A4, A5, A6, A7, A8, A9, 
A10, A11, A12, A13, A14, A15, RT]): UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: 
ScalaReflection.schemaFor[A4].dataType :: 
ScalaReflection.schemaFor[A5].dataType :: 
ScalaReflection.schemaFor[A6].dataType :: 
ScalaReflection.schemaFor[A7].dataType :: 
ScalaReflection.schemaFor[A8].dataType :: 
ScalaReflection.schemaFor[A9].dataType :: 
ScalaReflection.schemaFor[A10].dataType :: 
ScalaReflection.schemaFor[A11].dataType :: 
ScalaReflection.schemaFor[A12].dataType :: 
ScalaReflection.schemaFor[A13].dataType :: 
ScalaReflection.schemaFor[A14].dataType :: 
ScalaReflection.schemaFor[A15].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -351,11 +351,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: 
TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: 
TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, 
A16: TypeTag](name: String, func: Function16[A1, A2, A3, A4, A5, A6, A7, A8, 
A9, A10, A11, A12, A13, A14, A15, A16, RT]): UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: 
ScalaReflection.schemaFor[A4].dataType :: 
ScalaReflection.schemaFor[A5].dataType :: 
ScalaReflection.schemaFor[A6].dataType :: 
ScalaReflection.schemaFor[A7].dataType :: 
ScalaReflection.schemaFor[A8].dataType :: 
ScalaReflection.schemaFor[A9].dataType :: 
ScalaReflection.schemaFor[A10].dataType :: 
ScalaReflection.schemaFor[A11].dataType :: 
ScalaReflection.schemaFor[A12].dataType :: 
ScalaReflection.schemaFor[A13].dataType :: 
ScalaReflection.schemaFor[A14].dataType :: 
ScalaReflection.schemaFor[A15].dataType :: 
ScalaReflection.schemaFor[A16].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -364,11 +364,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: 
TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: 
TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, 
A16: TypeTag, A17: TypeTag](name: String, func: Function17[A1, A2, A3, A4, A5, 
A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, RT]): 
UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: 
ScalaReflection.schemaFor[A4].dataType :: 
ScalaReflection.schemaFor[A5].dataType :: 
ScalaReflection.schemaFor[A6].dataType :: 
ScalaReflection.schemaFor[A7].dataType :: 
ScalaReflection.schemaFor[A8].dataType :: 
ScalaReflection.schemaFor[A9].dataType :: 
ScalaReflection.schemaFor[A10].dataType :: 
ScalaReflection.schemaFor[A11].dataType :: 
ScalaReflection.schemaFor[A12].dataType :: 
ScalaReflection.schemaFor[A13].dataType :: 
ScalaReflection.schemaFor[A14].dataType :: 
ScalaReflection.schemaFor[A15].dataType :: 
ScalaReflection.schemaFor[A16].dataType :: 
ScalaReflection.schemaFor[A17].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -377,11 +377,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: 
TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: 
TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, 
A16: TypeTag, A17: TypeTag, A18: TypeTag](name: String, func: Function18[A1, 
A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, 
RT]): UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: 
ScalaReflection.schemaFor[A4].dataType :: 
ScalaReflection.schemaFor[A5].dataType :: 
ScalaReflection.schemaFor[A6].dataType :: 
ScalaReflection.schemaFor[A7].dataType :: 
ScalaReflection.schemaFor[A8].dataType :: 
ScalaReflection.schemaFor[A9].dataType :: 
ScalaReflection.schemaFor[A10].dataType :: 
ScalaReflection.schemaFor[A11].dataType :: 
ScalaReflection.schemaFor[A12].dataType :: 
ScalaReflection.schemaFor[A13].dataType :: 
ScalaReflection.schemaFor[A14].dataType :: 
ScalaReflection.schemaFor[A15].dataType :: 
ScalaReflection.schemaFor[A16].dataType :: 
ScalaReflection.schemaFor[A17].dataType :: 
ScalaReflection.schemaFor[A18].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -390,11 +390,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: 
TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: 
TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, 
A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag](name: String, func: 
Function19[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, 
A16, A17, A18, A19, RT]): UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: 
ScalaReflection.schemaFor[A4].dataType :: 
ScalaReflection.schemaFor[A5].dataType :: 
ScalaReflection.schemaFor[A6].dataType :: 
ScalaReflection.schemaFor[A7].dataType :: 
ScalaReflection.schemaFor[A8].dataType :: 
ScalaReflection.schemaFor[A9].dataType :: 
ScalaReflection.schemaFor[A10].dataType :: 
ScalaReflection.schemaFor[A11].dataType :: 
ScalaReflection.schemaFor[A12].dataType :: 
ScalaReflection.schemaFor[A13].dataType :: 
ScalaReflection.schemaFor[A14].dataType :: 
ScalaReflection.schemaFor[A15].dataType :: 
ScalaReflection.schemaFor[A16].dataType :: 
ScalaReflection.schemaFor[A17].dataType :: 
ScalaReflection.schemaFor[A18].dataType :: 
ScalaReflection.schemaFor[A19].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -403,11 +403,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: 
TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: 
TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, 
A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag](name: 
String, func: Function20[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, 
A13, A14, A15, A16, A17, A18, A19, A20, RT]): UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: 
ScalaReflection.schemaFor[A4].dataType :: 
ScalaReflection.schemaFor[A5].dataType :: 
ScalaReflection.schemaFor[A6].dataType :: 
ScalaReflection.schemaFor[A7].dataType :: 
ScalaReflection.schemaFor[A8].dataType :: 
ScalaReflection.schemaFor[A9].dataType :: 
ScalaReflection.schemaFor[A10].dataType :: 
ScalaReflection.schemaFor[A11].dataType :: 
ScalaReflection.schemaFor[A12].dataType :: 
ScalaReflection.schemaFor[A13].dataType :: 
ScalaReflection.schemaFor[A14].dataType :: 
ScalaReflection.schemaFor[A15].dataType :: 
ScalaReflection.schemaFor[A16].dataType :: 
ScalaReflection.schemaFor[A17].dataType :: 
ScalaReflection.schemaFor[A18].dataType :: 
ScalaReflection.schemaFor[A19].dataType :: 
ScalaReflection.schemaFor[A20].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -416,11 +416,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: 
TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: 
TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, 
A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: 
TypeTag](name: String, func: Function21[A1, A2, A3, A4, A5, A6, A7, A8, A9, 
A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, RT]): 
UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: 
ScalaReflection.schemaFor[A4].dataType :: 
ScalaReflection.schemaFor[A5].dataType :: 
ScalaReflection.schemaFor[A6].dataType :: 
ScalaReflection.schemaFor[A7].dataType :: 
ScalaReflection.schemaFor[A8].dataType :: 
ScalaReflection.schemaFor[A9].dataType :: 
ScalaReflection.schemaFor[A10].dataType :: 
ScalaReflection.schemaFor[A11].dataType :: 
ScalaReflection.schemaFor[A12].dataType :: 
ScalaReflection.schemaFor[A13].dataType :: 
ScalaReflection.schemaFor[A14].dataType :: 
ScalaReflection.schemaFor[A15].dataType :: 
ScalaReflection.schemaFor[A16].dataType :: 
ScalaReflection.schemaFor[A17].dataType :: 
ScalaReflection.schemaFor[A18].dataType :: 
ScalaReflection.schemaFor[A19].dataType :: 
ScalaReflection.schemaFor[A20].dataType :: 
ScalaReflection.schemaFor[A21].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -429,11 +429,11 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
    * @since 1.3.0
    */
   def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: 
TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: 
TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, 
A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: 
TypeTag, A22: TypeTag](name: String, func: Function22[A1, A2, A3, A4, A5, A6, 
A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, A22, 
RT]): UserDefinedFunction = {
-    val dataType = ScalaReflection.schemaFor[RT].dataType
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: 
ScalaReflection.schemaFor[A2].dataType :: 
ScalaReflection.schemaFor[A3].dataType :: 
ScalaReflection.schemaFor[A4].dataType :: 
ScalaReflection.schemaFor[A5].dataType :: 
ScalaReflection.schemaFor[A6].dataType :: 
ScalaReflection.schemaFor[A7].dataType :: 
ScalaReflection.schemaFor[A8].dataType :: 
ScalaReflection.schemaFor[A9].dataType :: 
ScalaReflection.schemaFor[A10].dataType :: 
ScalaReflection.schemaFor[A11].dataType :: 
ScalaReflection.schemaFor[A12].dataType :: 
ScalaReflection.schemaFor[A13].dataType :: 
ScalaReflection.schemaFor[A14].dataType :: 
ScalaReflection.schemaFor[A15].dataType :: 
ScalaReflection.schemaFor[A16].dataType :: 
ScalaReflection.schemaFor[A17].dataType :: 
ScalaReflection.schemaFor[A18].dataType :: 
ScalaReflection.schemaFor[A19].dataType :: 
ScalaReflection.schemaFor[A20].dataType :: 
ScalaReflection.schemaFor[A21].dataType :: 
ScalaReflection.schemaFor[A22].dataType :: Nil).toOption
-    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name))
+    def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
     functionRegistry.registerFunction(name, builder)
-    UserDefinedFunction(func, dataType, inputTypes)
+    UserDefinedFunction(func, dataType, inputTypes).withNullability(nullable)
   }
 
   
//////////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/spark/blob/0ef16bd4/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
index b13fe70..5a0f488 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
@@ -47,12 +47,47 @@ case class UserDefinedFunction protected[sql] (
     dataType: DataType,
     inputTypes: Option[Seq[DataType]]) {
 
+  private var _nullable: Boolean = true
+
+  /**
+   * Returns true when the UDF can return a nullable value.
+   *
+   * @since 2.3.0
+   */
+  def nullable: Boolean = _nullable
+
   /**
    * Returns an expression that invokes the UDF, using the given arguments.
    *
    * @since 1.3.0
    */
   def apply(exprs: Column*): Column = {
-    Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil)))
+    Column(ScalaUDF(
+      f,
+      dataType,
+      exprs.map(_.expr),
+      inputTypes.getOrElse(Nil),
+      nullable = _nullable))
+  }
+
+  private def copyAll(): UserDefinedFunction = {
+    val udf = copy()
+    udf._nullable = _nullable
+    udf
+  }
+
+  /**
+   * Updates UserDefinedFunction with a given nullability.
+   *
+   * @since 2.3.0
+   */
+  def withNullability(nullable: Boolean): UserDefinedFunction = {
+    if (nullable == _nullable) {
+      this
+    } else {
+      val udf = copyAll()
+      udf._nullable = nullable
+      udf
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0ef16bd4/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 987011e..b9769f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3169,12 +3169,14 @@ object functions {
      * @since 1.3.0
      */
     def udf[$typeTags](f: Function$x[$types]): UserDefinedFunction = {
+      val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
       val inputTypes = Try($inputTypes).toOption
-      UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, 
inputTypes)
+      UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
     }""")
   }
 
   */
+
   /**
    * Defines a user-defined function of 0 arguments as user-defined function 
(UDF).
    * The data types are automatically inferred based on the function's 
signature.
@@ -3183,8 +3185,9 @@ object functions {
    * @since 1.3.0
    */
   def udf[RT: TypeTag](f: Function0[RT]): UserDefinedFunction = {
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(Nil).toOption
-    UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, 
inputTypes)
+    UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -3195,8 +3198,9 @@ object functions {
    * @since 1.3.0
    */
   def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction 
= {
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: 
Nil).toOption
-    UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, 
inputTypes)
+    UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -3207,8 +3211,9 @@ object functions {
    * @since 1.3.0
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): 
UserDefinedFunction = {
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: 
ScalaReflection.schemaFor(typeTag[A2]).dataType :: Nil).toOption
-    UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, 
inputTypes)
+    UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -3219,8 +3224,9 @@ object functions {
    * @since 1.3.0
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](f: Function3[A1, 
A2, A3, RT]): UserDefinedFunction = {
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: 
ScalaReflection.schemaFor(typeTag[A2]).dataType :: 
ScalaReflection.schemaFor(typeTag[A3]).dataType :: Nil).toOption
-    UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, 
inputTypes)
+    UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -3231,8 +3237,9 @@ object functions {
    * @since 1.3.0
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag](f: 
Function4[A1, A2, A3, A4, RT]): UserDefinedFunction = {
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: 
ScalaReflection.schemaFor(typeTag[A2]).dataType :: 
ScalaReflection.schemaFor(typeTag[A3]).dataType :: 
ScalaReflection.schemaFor(typeTag[A4]).dataType :: Nil).toOption
-    UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, 
inputTypes)
+    UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -3243,8 +3250,9 @@ object functions {
    * @since 1.3.0
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: 
TypeTag](f: Function5[A1, A2, A3, A4, A5, RT]): UserDefinedFunction = {
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: 
ScalaReflection.schemaFor(typeTag[A2]).dataType :: 
ScalaReflection.schemaFor(typeTag[A3]).dataType :: 
ScalaReflection.schemaFor(typeTag[A4]).dataType :: 
ScalaReflection.schemaFor(typeTag[A5]).dataType :: Nil).toOption
-    UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, 
inputTypes)
+    UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -3255,8 +3263,9 @@ object functions {
    * @since 1.3.0
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: 
TypeTag, A6: TypeTag](f: Function6[A1, A2, A3, A4, A5, A6, RT]): 
UserDefinedFunction = {
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: 
ScalaReflection.schemaFor(typeTag[A2]).dataType :: 
ScalaReflection.schemaFor(typeTag[A3]).dataType :: 
ScalaReflection.schemaFor(typeTag[A4]).dataType :: 
ScalaReflection.schemaFor(typeTag[A5]).dataType :: 
ScalaReflection.schemaFor(typeTag[A6]).dataType :: Nil).toOption
-    UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, 
inputTypes)
+    UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -3267,8 +3276,9 @@ object functions {
    * @since 1.3.0
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: 
TypeTag, A6: TypeTag, A7: TypeTag](f: Function7[A1, A2, A3, A4, A5, A6, A7, 
RT]): UserDefinedFunction = {
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: 
ScalaReflection.schemaFor(typeTag[A2]).dataType :: 
ScalaReflection.schemaFor(typeTag[A3]).dataType :: 
ScalaReflection.schemaFor(typeTag[A4]).dataType :: 
ScalaReflection.schemaFor(typeTag[A5]).dataType :: 
ScalaReflection.schemaFor(typeTag[A6]).dataType :: 
ScalaReflection.schemaFor(typeTag[A7]).dataType :: Nil).toOption
-    UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, 
inputTypes)
+    UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -3279,8 +3289,9 @@ object functions {
    * @since 1.3.0
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: 
TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag](f: Function8[A1, A2, A3, A4, 
A5, A6, A7, A8, RT]): UserDefinedFunction = {
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: 
ScalaReflection.schemaFor(typeTag[A2]).dataType :: 
ScalaReflection.schemaFor(typeTag[A3]).dataType :: 
ScalaReflection.schemaFor(typeTag[A4]).dataType :: 
ScalaReflection.schemaFor(typeTag[A5]).dataType :: 
ScalaReflection.schemaFor(typeTag[A6]).dataType :: 
ScalaReflection.schemaFor(typeTag[A7]).dataType :: 
ScalaReflection.schemaFor(typeTag[A8]).dataType :: Nil).toOption
-    UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, 
inputTypes)
+    UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -3291,8 +3302,9 @@ object functions {
    * @since 1.3.0
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: 
TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag](f: Function9[A1, 
A2, A3, A4, A5, A6, A7, A8, A9, RT]): UserDefinedFunction = {
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: 
ScalaReflection.schemaFor(typeTag[A2]).dataType :: 
ScalaReflection.schemaFor(typeTag[A3]).dataType :: 
ScalaReflection.schemaFor(typeTag[A4]).dataType :: 
ScalaReflection.schemaFor(typeTag[A5]).dataType :: 
ScalaReflection.schemaFor(typeTag[A6]).dataType :: 
ScalaReflection.schemaFor(typeTag[A7]).dataType :: 
ScalaReflection.schemaFor(typeTag[A8]).dataType :: 
ScalaReflection.schemaFor(typeTag[A9]).dataType :: Nil).toOption
-    UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, 
inputTypes)
+    UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
   }
 
   /**
@@ -3303,8 +3315,9 @@ object functions {
    * @since 1.3.0
    */
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: 
TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](f: 
Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT]): UserDefinedFunction = 
{
+    val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
     val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: 
ScalaReflection.schemaFor(typeTag[A2]).dataType :: 
ScalaReflection.schemaFor(typeTag[A3]).dataType :: 
ScalaReflection.schemaFor(typeTag[A4]).dataType :: 
ScalaReflection.schemaFor(typeTag[A5]).dataType :: 
ScalaReflection.schemaFor(typeTag[A6]).dataType :: 
ScalaReflection.schemaFor(typeTag[A7]).dataType :: 
ScalaReflection.schemaFor(typeTag[A8]).dataType :: 
ScalaReflection.schemaFor(typeTag[A9]).dataType :: 
ScalaReflection.schemaFor(typeTag[A10]).dataType :: Nil).toOption
-    UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, 
inputTypes)
+    UserDefinedFunction(f, dataType, inputTypes).withNullability(nullable)
   }
 
   // scalastyle:on parameter.number


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to