This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f5317f1  [SPARK-27893][SQL][PYTHON] Create an integrated test base for 
Python, Scalar Pandas, Scala UDF by sql files
f5317f1 is described below

commit f5317f10b25bd193cf5026a8f4fd1cd1ded8f5b4
Author: HyukjinKwon <gurwls...@apache.org>
AuthorDate: Mon Jun 3 10:03:36 2019 +0900

    [SPARK-27893][SQL][PYTHON] Create an integrated test base for Python, 
Scalar Pandas, Scala UDF by sql files
    
    ## What changes were proposed in this pull request?
    
    This PR targets to add an integrated test base for various UDF test cases 
so that Scalar UDF, Python UDF and Scalar Pandas UDFs can be tested in SBT & 
Maven tests.
    
    ### Problem
    
    One of the problems we face is that: `ExtractPythonUDFs` (for Python UDF 
and Scalar Pandas UDF) has unevaluable expressions that always has to be 
wrapped with special plans. This special rule seems producing many issues, for 
instance, SPARK-27803, SPARK-26147, SPARK-26864, SPARK-26293, SPARK-25314 and 
SPARK-24721.
    
    ### Why do we have less test cases dedicated for SQL and plans with Python 
UDFs?
    
    We have virtually no such SQL (or plan) dedicated tests in PySpark to catch 
such issues because:
      - A developer should know all the analyzer, the optimizer, SQL, PySpark, 
Py4J and version differences in Python to write such good test cases
      - To test plans, we should access to plans in JVM via Py4J which is 
tricky, messy and duplicates Scala test cases
      - Usually we just add end-to-end test cases in PySpark therefore there 
are not so many dedicated examples to refer to write in PySpark
    
    It is also a non-trivial overhead to switch test base and method (IMHO).
    
    ### How does this PR fix?
    
    This PR adds Python UDF and Scalar Pandas UDF into our `*.sql` file based 
test base in runtime of SBT / Maven test cases. It generates Python-pickled 
instance (consisting of return type and Python native function) that is used in 
Python or Scalar Pandas UDF and directly brings into JVM.
    
    After that, (we don't interact via Py4J) run the tests directly in JVM - we 
can just register and run Python UDF and Scalar Pandas UDF in JVM.
    
    Currently, I only integrated this change into SQL file based testing. This 
is how works with test files under `udf` directory:
    
    After the test files under 'inputs/udf' directory are detected, it creates 
three test cases:
      - Scala UDF test case with a Scalar UDF registered named 'udf'.
      - Python UDF test case with a Python UDF registered named 'udf' iff 
Python executable and pyspark are available.
      - Scalar Pandas UDF test case with a Scalar Pandas UDF registered named 
'udf' iff Python executable, pandas, pyspark and pyarrow are available.
    
    Therefore, UDF test cases should have single input and output files but 
executed by three different types of UDFs.
    
    For instance,
    
    ```sql
    CREATE TEMPORARY VIEW ta AS
    SELECT udf(a) AS a, udf('a') AS tag FROM t1
    UNION ALL
    SELECT udf(a) AS a, udf('b') AS tag FROM t2;
    
    CREATE TEMPORARY VIEW tb AS
    SELECT udf(a) AS a, udf('a') AS tag FROM t3
    UNION ALL
    SELECT udf(a) AS a, udf('b') AS tag FROM t4;
    
    SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag;
    ```
    
    will be ran 3 times with Scalar UDF, Python UDF and Scalar Pandas UDF each.
    
    ### Appendix
    
    Plus, this PR adds `IntegratedUDFTestUtils` which enables to test and 
execute Python UDF and Scalar Pandas UDFs as below:
    
    To register Python UDF in SQL:
    
    ```scala
    IntegratedUDFTestUtils.registerTestUDF(TestPythonUDF(name = "udf"), spark)
    ```
    
    To register Scalar Pandas UDF in SQL:
    
    ```scala
    IntegratedUDFTestUtils.registerTestUDF(TestScalarPandasUDF(name = "udf"), 
spark)
    ```
    
     To use it in Scala API:
    
    ```scala
    spark.select(expr("udf(1)").show()
    ```
    
     To use it in SQL:
    
    ```scala
    sql("SELECT udf(1)").show()
    ```
    
    This util could be used in the future for better coverage with Scala API 
combinations as well.
    
    ## How was this patch tested?
    
    Tested via the command below:
    
    ```bash
    build/sbt "sql/test-only *SQLQueryTestSuite -- -z udf/udf-inner-join.sql"
    ```
    
    ```
    [info] SQLQueryTestSuite:
    [info] - udf/udf-inner-join.sql - Scala UDF (5 seconds, 47 milliseconds)
    [info] - udf/udf-inner-join.sql - Python UDF (4 seconds, 335 milliseconds)
    [info] - udf/udf-inner-join.sql - Scalar Pandas UDF (5 seconds, 423 
milliseconds)
    ```
    
    [python] unavailable:
    
    ```
    [info] SQLQueryTestSuite:
    [info] - udf/udf-inner-join.sql - Scala UDF (4 seconds, 577 milliseconds)
    [info] - udf/udf-inner-join.sql - Python UDF is skipped because [pyton] 
and/or pyspark were not available. !!! IGNORED !!!
    [info] - udf/udf-inner-join.sql - Scalar Pandas UDF is skipped because 
pyspark,pandas and/or pyarrow were not available in [pyton]. !!! IGNORED !!!
    ```
    
    pyspark unavailable:
    
    ```
    [info] SQLQueryTestSuite:
    [info] - udf/udf-inner-join.sql - Scala UDF (4 seconds, 991 milliseconds)
    [info] - udf/udf-inner-join.sql - Python UDF is skipped because [python] 
and/or pyspark were not available. !!! IGNORED !!!
    [info] - udf/udf-inner-join.sql - Scalar Pandas UDF is skipped because 
pyspark,pandas and/or pyarrow were not available in [python]. !!! IGNORED !!!
    ```
    
    pandas and/or pyarrow unavailable:
    
    ```
    [info] SQLQueryTestSuite:
    [info] - udf/udf-inner-join.sql - Scala UDF (4 seconds, 713 milliseconds)
    [info] - udf/udf-inner-join.sql - Python UDF (3 seconds, 89 milliseconds)
    [info] - udf/udf-inner-join.sql - Scalar Pandas UDF is skipped because 
pandas and/or pyarrow were not available in [python]. !!! IGNORED !!!
    ```
    
    Closes #24752 from HyukjinKwon/udf-tests.
    
    Authored-by: HyukjinKwon <gurwls...@apache.org>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 .../org/apache/spark/api/python/PythonRunner.scala |  13 +-
 .../sql-tests/inputs/udf/udf-inner-join.sql        |  17 ++
 .../sql-tests/results/udf/udf-inner-join.sql.out   |  67 ++++++
 .../apache/spark/sql/IntegratedUDFTestUtils.scala  | 251 +++++++++++++++++++++
 .../org/apache/spark/sql/SQLQueryTestSuite.scala   |  95 ++++++--
 5 files changed, 424 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index dca8704..4dcc5eb 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -79,12 +79,15 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY).map(_ / 
conf.get(EXECUTOR_CORES))
 
   // All the Python functions should have the same exec, version and envvars.
-  protected val envVars = funcs.head.funcs.head.envVars
-  protected val pythonExec = funcs.head.funcs.head.pythonExec
-  protected val pythonVer = funcs.head.funcs.head.pythonVer
+  protected val envVars: java.util.Map[String, String] = 
funcs.head.funcs.head.envVars
+  protected val pythonExec: String = funcs.head.funcs.head.pythonExec
+  protected val pythonVer: String = funcs.head.funcs.head.pythonVer
 
   // TODO: support accumulator in multiple UDF
-  protected val accumulator = funcs.head.funcs.head.accumulator
+  protected val accumulator: PythonAccumulatorV2 = 
funcs.head.funcs.head.accumulator
+
+  // Python accumulator is always set in production except in tests. See 
SPARK-27893
+  private val maybeAccumulator: Option[PythonAccumulatorV2] = 
Option(accumulator)
 
   // Expose a ServerSocket to support method calls via socket from Python side.
   private[spark] var serverSocket: Option[ServerSocket] = None
@@ -465,7 +468,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
         val updateLen = stream.readInt()
         val update = new Array[Byte](updateLen)
         stream.readFully(update)
-        accumulator.add(update)
+        maybeAccumulator.foreach(_.add(update))
       }
       // Check whether the worker is ready to be re-used.
       if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/udf/udf-inner-join.sql 
b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-inner-join.sql
new file mode 100644
index 0000000..8bd61b8
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-inner-join.sql
@@ -0,0 +1,17 @@
+-- This test file was converted from inner-join.sql.
+CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (1) AS GROUPING(a);
+CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (1) AS GROUPING(a);
+CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a);
+CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a);
+
+CREATE TEMPORARY VIEW ta AS
+SELECT udf(a) AS a, udf('a') AS tag FROM t1
+UNION ALL
+SELECT udf(a) AS a, udf('b') AS tag FROM t2;
+
+CREATE TEMPORARY VIEW tb AS
+SELECT udf(a) AS a, udf('a') AS tag FROM t3
+UNION ALL
+SELECT udf(a) AS a, udf('b') AS tag FROM t4;
+
+SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag;
diff --git 
a/sql/core/src/test/resources/sql-tests/results/udf/udf-inner-join.sql.out 
b/sql/core/src/test/resources/sql-tests/results/udf/udf-inner-join.sql.out
new file mode 100644
index 0000000..10952cb
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-inner-join.sql.out
@@ -0,0 +1,67 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 7
+
+
+-- !query 0
+CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (1) AS GROUPING(a)
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (1) AS GROUPING(a)
+-- !query 1 schema
+struct<>
+-- !query 1 output
+
+
+
+-- !query 2
+CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a)
+-- !query 2 schema
+struct<>
+-- !query 2 output
+
+
+
+-- !query 3
+CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a)
+-- !query 3 schema
+struct<>
+-- !query 3 output
+
+
+
+-- !query 4
+CREATE TEMPORARY VIEW ta AS
+SELECT udf(a) AS a, udf('a') AS tag FROM t1
+UNION ALL
+SELECT udf(a) AS a, udf('b') AS tag FROM t2
+-- !query 4 schema
+struct<>
+-- !query 4 output
+
+
+
+-- !query 5
+CREATE TEMPORARY VIEW tb AS
+SELECT udf(a) AS a, udf('a') AS tag FROM t3
+UNION ALL
+SELECT udf(a) AS a, udf('b') AS tag FROM t4
+-- !query 5 schema
+struct<>
+-- !query 5 output
+
+
+
+-- !query 6
+SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag
+-- !query 6 schema
+struct<a:string,tag:string>
+-- !query 6 output
+1      a
+1      a
+1      b
+1      b
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
new file mode 100644
index 0000000..9e3b1ca
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.nio.file.{Files, Paths}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.spark.TestUtils
+import org.apache.spark.api.python.{PythonBroadcast, PythonEvalType, 
PythonFunction}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.internal.config.Tests
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
+import org.apache.spark.sql.expressions.SparkUserDefinedFunction
+import org.apache.spark.sql.types.StringType
+
+/**
+ * This object targets to integrate various UDF test cases so that Scalar UDF, 
Python UDF and
+ * Scalar Pandas UDFs can be tested in SBT & Maven tests.
+ *
+ * The available UDFs cast input to strings, which take one column as input 
and return a string
+ * type column as output.
+ *
+ * To register Scala UDF in SQL:
+ * {{{
+ *   registerTestUDF(TestScalaUDF(name = "udf_name"), spark)
+ * }}}
+ *
+ * To register Python UDF in SQL:
+ * {{{
+ *   registerTestUDF(TestPythonUDF(name = "udf_name"), spark)
+ * }}}
+ *
+ * To register Scalar Pandas UDF in SQL:
+ * {{{
+ *   registerTestUDF(TestScalarPandasUDF(name = "udf_name"), spark)
+ * }}}
+ *
+ * To use it in Scala API and SQL:
+ * {{{
+ *   sql("SELECT udf_name(1)")
+ *   spark.select(expr("udf_name(1)")
+ * }}}
+ */
+object IntegratedUDFTestUtils extends SQLHelper {
+  import scala.sys.process._
+
+  private lazy val pythonPath = sys.env.getOrElse("PYTHONPATH", "")
+  private lazy val sparkHome = if (sys.props.contains(Tests.IS_TESTING.key)) {
+    assert(sys.props.contains("spark.test.home"), "spark.test.home is not 
set.")
+    sys.props("spark.test.home")
+  } else {
+    assert(sys.env.contains("SPARK_HOME"), "SPARK_HOME is not set.")
+    sys.env("SPARK_HOME")
+  }
+  // Note that we will directly refer pyspark's source, not the zip from a 
regular build.
+  // It is possible the test is being ran without the build.
+  private lazy val sourcePath = Paths.get(sparkHome, "python").toAbsolutePath
+  private lazy val py4jPath = Paths.get(
+    sparkHome, "python", "lib", "py4j-0.10.8.1-src.zip").toAbsolutePath
+  private lazy val pysparkPythonPath = s"$py4jPath:$sourcePath"
+
+  private lazy val isPythonAvailable: Boolean = 
TestUtils.testCommandAvailable(pythonExec)
+
+  private lazy val isPySparkAvailable: Boolean = isPythonAvailable && Try {
+    Process(
+      Seq(pythonExec, "-c", "import pyspark"),
+      None,
+      "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!
+    true
+  }.getOrElse(false)
+
+  private lazy val isPandasAvailable: Boolean = isPythonAvailable && 
isPySparkAvailable && Try {
+    Process(
+      Seq(
+        pythonExec,
+        "-c",
+        "from pyspark.sql.utils import require_minimum_pandas_version;" +
+          "require_minimum_pandas_version()"),
+      None,
+      "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!
+    true
+  }.getOrElse(false)
+
+  private lazy val isPyArrowAvailable: Boolean = isPythonAvailable && 
isPySparkAvailable  && Try {
+    Process(
+      Seq(
+        pythonExec,
+        "-c",
+        "from pyspark.sql.utils import require_minimum_pyarrow_version;" +
+          "require_minimum_pyarrow_version()"),
+      None,
+      "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!
+    true
+  }.getOrElse(false)
+
+  private lazy val pythonVer = if (isPythonAvailable) {
+    Process(
+      Seq(pythonExec, "-c", "import sys; print('%d.%d' % 
sys.version_info[:2])"),
+      None,
+      "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!.trim()
+  } else {
+    throw new RuntimeException(s"Python executable [$pythonExec] is 
unavailable.")
+  }
+
+  // Dynamically pickles and reads the Python instance into JVM side in order 
to mimic
+  // Python native function within Python UDF.
+  private lazy val pythonFunc: Array[Byte] = if (shouldTestPythonUDFs) {
+    var binaryPythonFunc: Array[Byte] = null
+    withTempPath { path =>
+      Process(
+        Seq(
+          pythonExec,
+          "-c",
+          "from pyspark.sql.types import StringType; " +
+            "from pyspark.serializers import CloudPickleSerializer; " +
+            s"f = open('$path', 'wb');" +
+            s"f.write(CloudPickleSerializer().dumps((lambda x: str(x), 
StringType())))"),
+        None,
+        "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!
+      binaryPythonFunc = Files.readAllBytes(path.toPath)
+    }
+    assert(binaryPythonFunc != null)
+    binaryPythonFunc
+  } else {
+    throw new RuntimeException(s"Python executable [$pythonExec] and/or 
pyspark are unavailable.")
+  }
+
+  private lazy val pandasFunc: Array[Byte] = if (shouldTestScalarPandasUDFs) {
+    var binaryPandasFunc: Array[Byte] = null
+    withTempPath { path =>
+      Process(
+        Seq(
+          pythonExec,
+          "-c",
+          "from pyspark.sql.types import StringType; " +
+            "from pyspark.serializers import CloudPickleSerializer; " +
+            s"f = open('$path', 'wb');" +
+            s"f.write(CloudPickleSerializer().dumps((lambda x: x.apply(str), 
StringType())))"),
+        None,
+        "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!
+      binaryPandasFunc = Files.readAllBytes(path.toPath)
+    }
+    assert(binaryPandasFunc != null)
+    binaryPandasFunc
+  } else {
+    throw new RuntimeException(s"Python executable [$pythonExec] and/or 
pyspark are unavailable.")
+  }
+
+  // Make sure this map stays mutable - this map gets updated later in Python 
runners.
+  private val workerEnv = new java.util.HashMap[String, String]()
+  workerEnv.put("PYTHONPATH", s"$pysparkPythonPath:$pythonPath")
+
+  lazy val pythonExec: String = {
+    val pythonExec = sys.env.getOrElse(
+      "PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", 
"python3.6"))
+    if (TestUtils.testCommandAvailable(pythonExec)) {
+      pythonExec
+    } else {
+      "python"
+    }
+  }
+
+  lazy val shouldTestPythonUDFs: Boolean = isPythonAvailable && 
isPySparkAvailable
+
+  lazy val shouldTestScalarPandasUDFs: Boolean =
+    isPythonAvailable && isPandasAvailable && isPyArrowAvailable
+
+  /**
+   * A base trait for various UDFs defined in this object.
+   */
+  sealed trait TestUDF
+
+  /**
+   * A Python UDF that takes one column and returns a string column.
+   * Equivalent to `udf(lambda x: str(x), "string")`
+   */
+  case class TestPythonUDF(name: String) extends TestUDF {
+    lazy val udf = UserDefinedPythonFunction(
+      name = name,
+      func = PythonFunction(
+        command = pythonFunc,
+        envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, 
String]],
+        pythonIncludes = List.empty[String].asJava,
+        pythonExec = pythonExec,
+        pythonVer = pythonVer,
+        broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava,
+        accumulator = null),
+      dataType = StringType,
+      pythonEvalType = PythonEvalType.SQL_BATCHED_UDF,
+      udfDeterministic = true)
+  }
+
+  /**
+   * A Scalar Pandas UDF that takes one column and returns a string column.
+   * Equivalent to `pandas_udf(lambda x: x.apply(str), "string", 
PandasUDFType.SCALAR)`.
+   */
+  case class TestScalarPandasUDF(name: String) extends TestUDF {
+    lazy val udf = UserDefinedPythonFunction(
+      name = name,
+      func = PythonFunction(
+        command = pandasFunc,
+        envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, 
String]],
+        pythonIncludes = List.empty[String].asJava,
+        pythonExec = pythonExec,
+        pythonVer = pythonVer,
+        broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava,
+        accumulator = null),
+      dataType = StringType,
+      pythonEvalType = PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+      udfDeterministic = true)
+  }
+
+  /**
+   * A Scala UDF that takes one column and returns a string column.
+   * Equivalent to `udf((input: Any) => input.toString)`.
+   */
+  case class TestScalaUDF(name: String) extends TestUDF {
+    lazy val udf = SparkUserDefinedFunction(
+      (input: Any) => input.toString,
+      StringType,
+      inputSchemas = Seq.fill(1)(None))
+  }
+
+  /**
+   * Register UDFs used in this test case.
+   */
+  def registerTestUDF(testUDF: TestUDF, session: SparkSession): Unit = testUDF 
match {
+    case udf: TestPythonUDF => session.udf.registerPython(udf.name, udf.udf)
+    case udf: TestScalarPandasUDF => session.udf.registerPython(udf.name, 
udf.udf)
+    case udf: TestScalaUDF => session.udf.register(udf.name, udf.udf)
+    case other => throw new RuntimeException(s"Unknown UDF class 
[${other.getClass}]")
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index 169fb21..6e68e57 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -86,9 +86,25 @@ import org.apache.spark.sql.types.StructType
  *   -- !query 1
  *   ...
  * }}}
+ *
+ * Note that UDF tests work differently. After the test files under 
'inputs/udf' directory are
+ * detected, it creates three test cases:
+ *
+ *  - Scala UDF test case with a Scalar UDF registered as the name 'udf'.
+ *
+ *  - Python UDF test case with a Python UDF registered as the name 'udf'
+ *    iff Python executable and pyspark are available.
+ *
+ *  - Scalar Pandas UDF test case with a Scalar Pandas UDF registered as the 
name 'udf'
+ *    iff Python executable, pyspark, pandas and pyarrow are available.
+ *
+ * Therefore, UDF test cases should have single input and output files but 
executed by three
+ * different types of UDFs. See 'udf/udf-inner-join.sql' as an example.
  */
 class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
 
+  import IntegratedUDFTestUtils._
+
   private val regenerateGoldenFiles: Boolean = 
System.getenv("SPARK_GENERATE_GOLDEN_FILES") == "1"
 
   private val baseResourcePath = {
@@ -115,9 +131,6 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
   // Create all the test cases.
   listTestCases().foreach(createScalaTestCase)
 
-  /** A test case. */
-  private case class TestCase(name: String, inputFile: String, resultFile: 
String)
-
   /** A single SQL query's output. */
   private case class QueryOutput(sql: String, schema: String, output: String) {
     def toString(queryIndex: Int): String = {
@@ -126,19 +139,47 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
         sql + "\n" +
         s"-- !query $queryIndex schema\n" +
         schema + "\n" +
-         s"-- !query $queryIndex output\n" +
+        s"-- !query $queryIndex output\n" +
         output
     }
   }
 
+  /** A test case. */
+  private trait TestCase {
+    val name: String
+    val inputFile: String
+    val resultFile: String
+  }
+
+  /** A regular test case. */
+  private case class RegularTestCase(
+      name: String, inputFile: String, resultFile: String) extends TestCase
+
+  /** A UDF test case. */
+  private case class UDFTestCase(
+      name: String, inputFile: String, resultFile: String, udf: TestUDF) 
extends TestCase
+
   private def createScalaTestCase(testCase: TestCase): Unit = {
     if (blackList.exists(t =>
         
testCase.name.toLowerCase(Locale.ROOT).contains(t.toLowerCase(Locale.ROOT)))) {
       // Create a test case to ignore this case.
       ignore(testCase.name) { /* Do nothing */ }
-    } else {
-      // Create a test case to run this case.
-      test(testCase.name) { runTest(testCase) }
+    } else testCase match {
+      case UDFTestCase(_, _, _, udf: TestPythonUDF) if !shouldTestPythonUDFs =>
+        ignore(s"${testCase.name} is skipped because " +
+          s"[$pythonExec] and/or pyspark were not available.") {
+          /* Do nothing */
+        }
+      case UDFTestCase(_, _, _, udf: TestScalarPandasUDF) if 
!shouldTestScalarPandasUDFs =>
+        ignore(s"${testCase.name} is skipped because pyspark," +
+          s"pandas and/or pyarrow were not available in [$pythonExec].") {
+          /* Do nothing */
+        }
+      case _ =>
+        // Create a test case to run this case.
+        test(testCase.name) {
+          runTest(testCase)
+        }
     }
   }
 
@@ -166,7 +207,7 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
     // When we are regenerating the golden files, we don't need to set any 
config as they
     // all need to return the same result
     if (regenerateGoldenFiles) {
-      runQueries(queries, testCase.resultFile, None)
+      runQueries(queries, testCase, None)
     } else {
       val configSets = {
         val configLines = 
comments.filter(_.startsWith("--SET")).map(_.substring(5))
@@ -188,7 +229,7 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
 
       configSets.foreach { configSet =>
         try {
-          runQueries(queries, testCase.resultFile, Some(configSet))
+          runQueries(queries, testCase, Some(configSet))
         } catch {
           case e: Throwable =>
             val configs = configSet.map {
@@ -203,12 +244,16 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
 
   private def runQueries(
       queries: Seq[String],
-      resultFileName: String,
+      testCase: TestCase,
       configSet: Option[Seq[(String, String)]]): Unit = {
     // Create a local SparkSession to have stronger isolation between 
different test cases.
     // This does not isolate catalog changes.
     val localSparkSession = spark.newSession()
     loadTestData(localSparkSession)
+    testCase match {
+      case udfTestCase: UDFTestCase => registerTestUDF(udfTestCase.udf, 
localSparkSession)
+      case _ => // Don't add UDFs in Regular tests.
+    }
 
     if (configSet.isDefined) {
       // Execute the list of set operation in order to add the desired configs
@@ -233,7 +278,7 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
         s"-- Number of queries: ${outputs.size}\n\n\n" +
         outputs.zipWithIndex.map{case (qr, i) => 
qr.toString(i)}.mkString("\n\n\n") + "\n"
       }
-      val resultFile = new File(resultFileName)
+      val resultFile = new File(testCase.resultFile)
       val parent = resultFile.getParentFile
       if (!parent.exists()) {
         assert(parent.mkdirs(), "Could not create directory: " + parent)
@@ -243,7 +288,7 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
 
     // Read back the golden file.
     val expectedOutputs: Seq[QueryOutput] = {
-      val goldenOutput = fileToString(new File(resultFileName))
+      val goldenOutput = fileToString(new File(testCase.resultFile))
       val segments = goldenOutput.split("-- !query.+\n")
 
       // each query has 3 segments, plus the header
@@ -322,11 +367,33 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
   }
 
   private def listTestCases(): Seq[TestCase] = {
-    listFilesRecursively(new File(inputFilePath)).map { file =>
+    listFilesRecursively(new File(inputFilePath)).flatMap { file =>
       val resultFile = file.getAbsolutePath.replace(inputFilePath, 
goldenFilePath) + ".out"
       val absPath = file.getAbsolutePath
       val testCaseName = 
absPath.stripPrefix(inputFilePath).stripPrefix(File.separator)
-      TestCase(testCaseName, absPath, resultFile)
+
+      if 
(file.getAbsolutePath.startsWith(s"$inputFilePath${File.separator}udf")) {
+        Seq(
+          UDFTestCase(
+            s"$testCaseName - Scala UDF",
+            absPath,
+            resultFile,
+            TestScalaUDF(name = "udf")),
+
+          UDFTestCase(
+            s"$testCaseName - Python UDF",
+            absPath,
+            resultFile,
+            TestPythonUDF(name = "udf")),
+
+          UDFTestCase(
+            s"$testCaseName - Scalar Pandas UDF",
+            absPath,
+            resultFile,
+            TestScalarPandasUDF(name = "udf")))
+      } else {
+        RegularTestCase(testCaseName, absPath, resultFile) :: Nil
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to