This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f5317f1 [SPARK-27893][SQL][PYTHON] Create an integrated test base for Python, Scalar Pandas, Scala UDF by sql files f5317f1 is described below commit f5317f10b25bd193cf5026a8f4fd1cd1ded8f5b4 Author: HyukjinKwon <gurwls...@apache.org> AuthorDate: Mon Jun 3 10:03:36 2019 +0900 [SPARK-27893][SQL][PYTHON] Create an integrated test base for Python, Scalar Pandas, Scala UDF by sql files ## What changes were proposed in this pull request? This PR targets to add an integrated test base for various UDF test cases so that Scalar UDF, Python UDF and Scalar Pandas UDFs can be tested in SBT & Maven tests. ### Problem One of the problems we face is that: `ExtractPythonUDFs` (for Python UDF and Scalar Pandas UDF) has unevaluable expressions that always has to be wrapped with special plans. This special rule seems producing many issues, for instance, SPARK-27803, SPARK-26147, SPARK-26864, SPARK-26293, SPARK-25314 and SPARK-24721. ### Why do we have less test cases dedicated for SQL and plans with Python UDFs? We have virtually no such SQL (or plan) dedicated tests in PySpark to catch such issues because: - A developer should know all the analyzer, the optimizer, SQL, PySpark, Py4J and version differences in Python to write such good test cases - To test plans, we should access to plans in JVM via Py4J which is tricky, messy and duplicates Scala test cases - Usually we just add end-to-end test cases in PySpark therefore there are not so many dedicated examples to refer to write in PySpark It is also a non-trivial overhead to switch test base and method (IMHO). ### How does this PR fix? This PR adds Python UDF and Scalar Pandas UDF into our `*.sql` file based test base in runtime of SBT / Maven test cases. It generates Python-pickled instance (consisting of return type and Python native function) that is used in Python or Scalar Pandas UDF and directly brings into JVM. After that, (we don't interact via Py4J) run the tests directly in JVM - we can just register and run Python UDF and Scalar Pandas UDF in JVM. Currently, I only integrated this change into SQL file based testing. This is how works with test files under `udf` directory: After the test files under 'inputs/udf' directory are detected, it creates three test cases: - Scala UDF test case with a Scalar UDF registered named 'udf'. - Python UDF test case with a Python UDF registered named 'udf' iff Python executable and pyspark are available. - Scalar Pandas UDF test case with a Scalar Pandas UDF registered named 'udf' iff Python executable, pandas, pyspark and pyarrow are available. Therefore, UDF test cases should have single input and output files but executed by three different types of UDFs. For instance, ```sql CREATE TEMPORARY VIEW ta AS SELECT udf(a) AS a, udf('a') AS tag FROM t1 UNION ALL SELECT udf(a) AS a, udf('b') AS tag FROM t2; CREATE TEMPORARY VIEW tb AS SELECT udf(a) AS a, udf('a') AS tag FROM t3 UNION ALL SELECT udf(a) AS a, udf('b') AS tag FROM t4; SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag; ``` will be ran 3 times with Scalar UDF, Python UDF and Scalar Pandas UDF each. ### Appendix Plus, this PR adds `IntegratedUDFTestUtils` which enables to test and execute Python UDF and Scalar Pandas UDFs as below: To register Python UDF in SQL: ```scala IntegratedUDFTestUtils.registerTestUDF(TestPythonUDF(name = "udf"), spark) ``` To register Scalar Pandas UDF in SQL: ```scala IntegratedUDFTestUtils.registerTestUDF(TestScalarPandasUDF(name = "udf"), spark) ``` To use it in Scala API: ```scala spark.select(expr("udf(1)").show() ``` To use it in SQL: ```scala sql("SELECT udf(1)").show() ``` This util could be used in the future for better coverage with Scala API combinations as well. ## How was this patch tested? Tested via the command below: ```bash build/sbt "sql/test-only *SQLQueryTestSuite -- -z udf/udf-inner-join.sql" ``` ``` [info] SQLQueryTestSuite: [info] - udf/udf-inner-join.sql - Scala UDF (5 seconds, 47 milliseconds) [info] - udf/udf-inner-join.sql - Python UDF (4 seconds, 335 milliseconds) [info] - udf/udf-inner-join.sql - Scalar Pandas UDF (5 seconds, 423 milliseconds) ``` [python] unavailable: ``` [info] SQLQueryTestSuite: [info] - udf/udf-inner-join.sql - Scala UDF (4 seconds, 577 milliseconds) [info] - udf/udf-inner-join.sql - Python UDF is skipped because [pyton] and/or pyspark were not available. !!! IGNORED !!! [info] - udf/udf-inner-join.sql - Scalar Pandas UDF is skipped because pyspark,pandas and/or pyarrow were not available in [pyton]. !!! IGNORED !!! ``` pyspark unavailable: ``` [info] SQLQueryTestSuite: [info] - udf/udf-inner-join.sql - Scala UDF (4 seconds, 991 milliseconds) [info] - udf/udf-inner-join.sql - Python UDF is skipped because [python] and/or pyspark were not available. !!! IGNORED !!! [info] - udf/udf-inner-join.sql - Scalar Pandas UDF is skipped because pyspark,pandas and/or pyarrow were not available in [python]. !!! IGNORED !!! ``` pandas and/or pyarrow unavailable: ``` [info] SQLQueryTestSuite: [info] - udf/udf-inner-join.sql - Scala UDF (4 seconds, 713 milliseconds) [info] - udf/udf-inner-join.sql - Python UDF (3 seconds, 89 milliseconds) [info] - udf/udf-inner-join.sql - Scalar Pandas UDF is skipped because pandas and/or pyarrow were not available in [python]. !!! IGNORED !!! ``` Closes #24752 from HyukjinKwon/udf-tests. Authored-by: HyukjinKwon <gurwls...@apache.org> Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../org/apache/spark/api/python/PythonRunner.scala | 13 +- .../sql-tests/inputs/udf/udf-inner-join.sql | 17 ++ .../sql-tests/results/udf/udf-inner-join.sql.out | 67 ++++++ .../apache/spark/sql/IntegratedUDFTestUtils.scala | 251 +++++++++++++++++++++ .../org/apache/spark/sql/SQLQueryTestSuite.scala | 95 ++++++-- 5 files changed, 424 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index dca8704..4dcc5eb 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -79,12 +79,15 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY).map(_ / conf.get(EXECUTOR_CORES)) // All the Python functions should have the same exec, version and envvars. - protected val envVars = funcs.head.funcs.head.envVars - protected val pythonExec = funcs.head.funcs.head.pythonExec - protected val pythonVer = funcs.head.funcs.head.pythonVer + protected val envVars: java.util.Map[String, String] = funcs.head.funcs.head.envVars + protected val pythonExec: String = funcs.head.funcs.head.pythonExec + protected val pythonVer: String = funcs.head.funcs.head.pythonVer // TODO: support accumulator in multiple UDF - protected val accumulator = funcs.head.funcs.head.accumulator + protected val accumulator: PythonAccumulatorV2 = funcs.head.funcs.head.accumulator + + // Python accumulator is always set in production except in tests. See SPARK-27893 + private val maybeAccumulator: Option[PythonAccumulatorV2] = Option(accumulator) // Expose a ServerSocket to support method calls via socket from Python side. private[spark] var serverSocket: Option[ServerSocket] = None @@ -465,7 +468,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( val updateLen = stream.readInt() val update = new Array[Byte](updateLen) stream.readFully(update) - accumulator.add(update) + maybeAccumulator.foreach(_.add(update)) } // Check whether the worker is ready to be re-used. if (stream.readInt() == SpecialLengths.END_OF_STREAM) { diff --git a/sql/core/src/test/resources/sql-tests/inputs/udf/udf-inner-join.sql b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-inner-join.sql new file mode 100644 index 0000000..8bd61b8 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-inner-join.sql @@ -0,0 +1,17 @@ +-- This test file was converted from inner-join.sql. +CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (1) AS GROUPING(a); +CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (1) AS GROUPING(a); +CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a); +CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a); + +CREATE TEMPORARY VIEW ta AS +SELECT udf(a) AS a, udf('a') AS tag FROM t1 +UNION ALL +SELECT udf(a) AS a, udf('b') AS tag FROM t2; + +CREATE TEMPORARY VIEW tb AS +SELECT udf(a) AS a, udf('a') AS tag FROM t3 +UNION ALL +SELECT udf(a) AS a, udf('b') AS tag FROM t4; + +SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag; diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-inner-join.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-inner-join.sql.out new file mode 100644 index 0000000..10952cb --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-inner-join.sql.out @@ -0,0 +1,67 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 7 + + +-- !query 0 +CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (1) AS GROUPING(a) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (1) AS GROUPING(a) +-- !query 1 schema +struct<> +-- !query 1 output + + + +-- !query 2 +CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a) +-- !query 2 schema +struct<> +-- !query 2 output + + + +-- !query 3 +CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a) +-- !query 3 schema +struct<> +-- !query 3 output + + + +-- !query 4 +CREATE TEMPORARY VIEW ta AS +SELECT udf(a) AS a, udf('a') AS tag FROM t1 +UNION ALL +SELECT udf(a) AS a, udf('b') AS tag FROM t2 +-- !query 4 schema +struct<> +-- !query 4 output + + + +-- !query 5 +CREATE TEMPORARY VIEW tb AS +SELECT udf(a) AS a, udf('a') AS tag FROM t3 +UNION ALL +SELECT udf(a) AS a, udf('b') AS tag FROM t4 +-- !query 5 schema +struct<> +-- !query 5 output + + + +-- !query 6 +SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag +-- !query 6 schema +struct<a:string,tag:string> +-- !query 6 output +1 a +1 a +1 b +1 b diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala new file mode 100644 index 0000000..9e3b1ca --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.nio.file.{Files, Paths} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.spark.TestUtils +import org.apache.spark.api.python.{PythonBroadcast, PythonEvalType, PythonFunction} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.internal.config.Tests +import org.apache.spark.sql.catalyst.plans.SQLHelper +import org.apache.spark.sql.execution.python.UserDefinedPythonFunction +import org.apache.spark.sql.expressions.SparkUserDefinedFunction +import org.apache.spark.sql.types.StringType + +/** + * This object targets to integrate various UDF test cases so that Scalar UDF, Python UDF and + * Scalar Pandas UDFs can be tested in SBT & Maven tests. + * + * The available UDFs cast input to strings, which take one column as input and return a string + * type column as output. + * + * To register Scala UDF in SQL: + * {{{ + * registerTestUDF(TestScalaUDF(name = "udf_name"), spark) + * }}} + * + * To register Python UDF in SQL: + * {{{ + * registerTestUDF(TestPythonUDF(name = "udf_name"), spark) + * }}} + * + * To register Scalar Pandas UDF in SQL: + * {{{ + * registerTestUDF(TestScalarPandasUDF(name = "udf_name"), spark) + * }}} + * + * To use it in Scala API and SQL: + * {{{ + * sql("SELECT udf_name(1)") + * spark.select(expr("udf_name(1)") + * }}} + */ +object IntegratedUDFTestUtils extends SQLHelper { + import scala.sys.process._ + + private lazy val pythonPath = sys.env.getOrElse("PYTHONPATH", "") + private lazy val sparkHome = if (sys.props.contains(Tests.IS_TESTING.key)) { + assert(sys.props.contains("spark.test.home"), "spark.test.home is not set.") + sys.props("spark.test.home") + } else { + assert(sys.env.contains("SPARK_HOME"), "SPARK_HOME is not set.") + sys.env("SPARK_HOME") + } + // Note that we will directly refer pyspark's source, not the zip from a regular build. + // It is possible the test is being ran without the build. + private lazy val sourcePath = Paths.get(sparkHome, "python").toAbsolutePath + private lazy val py4jPath = Paths.get( + sparkHome, "python", "lib", "py4j-0.10.8.1-src.zip").toAbsolutePath + private lazy val pysparkPythonPath = s"$py4jPath:$sourcePath" + + private lazy val isPythonAvailable: Boolean = TestUtils.testCommandAvailable(pythonExec) + + private lazy val isPySparkAvailable: Boolean = isPythonAvailable && Try { + Process( + Seq(pythonExec, "-c", "import pyspark"), + None, + "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!! + true + }.getOrElse(false) + + private lazy val isPandasAvailable: Boolean = isPythonAvailable && isPySparkAvailable && Try { + Process( + Seq( + pythonExec, + "-c", + "from pyspark.sql.utils import require_minimum_pandas_version;" + + "require_minimum_pandas_version()"), + None, + "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!! + true + }.getOrElse(false) + + private lazy val isPyArrowAvailable: Boolean = isPythonAvailable && isPySparkAvailable && Try { + Process( + Seq( + pythonExec, + "-c", + "from pyspark.sql.utils import require_minimum_pyarrow_version;" + + "require_minimum_pyarrow_version()"), + None, + "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!! + true + }.getOrElse(false) + + private lazy val pythonVer = if (isPythonAvailable) { + Process( + Seq(pythonExec, "-c", "import sys; print('%d.%d' % sys.version_info[:2])"), + None, + "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!.trim() + } else { + throw new RuntimeException(s"Python executable [$pythonExec] is unavailable.") + } + + // Dynamically pickles and reads the Python instance into JVM side in order to mimic + // Python native function within Python UDF. + private lazy val pythonFunc: Array[Byte] = if (shouldTestPythonUDFs) { + var binaryPythonFunc: Array[Byte] = null + withTempPath { path => + Process( + Seq( + pythonExec, + "-c", + "from pyspark.sql.types import StringType; " + + "from pyspark.serializers import CloudPickleSerializer; " + + s"f = open('$path', 'wb');" + + s"f.write(CloudPickleSerializer().dumps((lambda x: str(x), StringType())))"), + None, + "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!! + binaryPythonFunc = Files.readAllBytes(path.toPath) + } + assert(binaryPythonFunc != null) + binaryPythonFunc + } else { + throw new RuntimeException(s"Python executable [$pythonExec] and/or pyspark are unavailable.") + } + + private lazy val pandasFunc: Array[Byte] = if (shouldTestScalarPandasUDFs) { + var binaryPandasFunc: Array[Byte] = null + withTempPath { path => + Process( + Seq( + pythonExec, + "-c", + "from pyspark.sql.types import StringType; " + + "from pyspark.serializers import CloudPickleSerializer; " + + s"f = open('$path', 'wb');" + + s"f.write(CloudPickleSerializer().dumps((lambda x: x.apply(str), StringType())))"), + None, + "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!! + binaryPandasFunc = Files.readAllBytes(path.toPath) + } + assert(binaryPandasFunc != null) + binaryPandasFunc + } else { + throw new RuntimeException(s"Python executable [$pythonExec] and/or pyspark are unavailable.") + } + + // Make sure this map stays mutable - this map gets updated later in Python runners. + private val workerEnv = new java.util.HashMap[String, String]() + workerEnv.put("PYTHONPATH", s"$pysparkPythonPath:$pythonPath") + + lazy val pythonExec: String = { + val pythonExec = sys.env.getOrElse( + "PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", "python3.6")) + if (TestUtils.testCommandAvailable(pythonExec)) { + pythonExec + } else { + "python" + } + } + + lazy val shouldTestPythonUDFs: Boolean = isPythonAvailable && isPySparkAvailable + + lazy val shouldTestScalarPandasUDFs: Boolean = + isPythonAvailable && isPandasAvailable && isPyArrowAvailable + + /** + * A base trait for various UDFs defined in this object. + */ + sealed trait TestUDF + + /** + * A Python UDF that takes one column and returns a string column. + * Equivalent to `udf(lambda x: str(x), "string")` + */ + case class TestPythonUDF(name: String) extends TestUDF { + lazy val udf = UserDefinedPythonFunction( + name = name, + func = PythonFunction( + command = pythonFunc, + envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, String]], + pythonIncludes = List.empty[String].asJava, + pythonExec = pythonExec, + pythonVer = pythonVer, + broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava, + accumulator = null), + dataType = StringType, + pythonEvalType = PythonEvalType.SQL_BATCHED_UDF, + udfDeterministic = true) + } + + /** + * A Scalar Pandas UDF that takes one column and returns a string column. + * Equivalent to `pandas_udf(lambda x: x.apply(str), "string", PandasUDFType.SCALAR)`. + */ + case class TestScalarPandasUDF(name: String) extends TestUDF { + lazy val udf = UserDefinedPythonFunction( + name = name, + func = PythonFunction( + command = pandasFunc, + envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, String]], + pythonIncludes = List.empty[String].asJava, + pythonExec = pythonExec, + pythonVer = pythonVer, + broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava, + accumulator = null), + dataType = StringType, + pythonEvalType = PythonEvalType.SQL_SCALAR_PANDAS_UDF, + udfDeterministic = true) + } + + /** + * A Scala UDF that takes one column and returns a string column. + * Equivalent to `udf((input: Any) => input.toString)`. + */ + case class TestScalaUDF(name: String) extends TestUDF { + lazy val udf = SparkUserDefinedFunction( + (input: Any) => input.toString, + StringType, + inputSchemas = Seq.fill(1)(None)) + } + + /** + * Register UDFs used in this test case. + */ + def registerTestUDF(testUDF: TestUDF, session: SparkSession): Unit = testUDF match { + case udf: TestPythonUDF => session.udf.registerPython(udf.name, udf.udf) + case udf: TestScalarPandasUDF => session.udf.registerPython(udf.name, udf.udf) + case udf: TestScalaUDF => session.udf.register(udf.name, udf.udf) + case other => throw new RuntimeException(s"Unknown UDF class [${other.getClass}]") + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 169fb21..6e68e57 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -86,9 +86,25 @@ import org.apache.spark.sql.types.StructType * -- !query 1 * ... * }}} + * + * Note that UDF tests work differently. After the test files under 'inputs/udf' directory are + * detected, it creates three test cases: + * + * - Scala UDF test case with a Scalar UDF registered as the name 'udf'. + * + * - Python UDF test case with a Python UDF registered as the name 'udf' + * iff Python executable and pyspark are available. + * + * - Scalar Pandas UDF test case with a Scalar Pandas UDF registered as the name 'udf' + * iff Python executable, pyspark, pandas and pyarrow are available. + * + * Therefore, UDF test cases should have single input and output files but executed by three + * different types of UDFs. See 'udf/udf-inner-join.sql' as an example. */ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { + import IntegratedUDFTestUtils._ + private val regenerateGoldenFiles: Boolean = System.getenv("SPARK_GENERATE_GOLDEN_FILES") == "1" private val baseResourcePath = { @@ -115,9 +131,6 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { // Create all the test cases. listTestCases().foreach(createScalaTestCase) - /** A test case. */ - private case class TestCase(name: String, inputFile: String, resultFile: String) - /** A single SQL query's output. */ private case class QueryOutput(sql: String, schema: String, output: String) { def toString(queryIndex: Int): String = { @@ -126,19 +139,47 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { sql + "\n" + s"-- !query $queryIndex schema\n" + schema + "\n" + - s"-- !query $queryIndex output\n" + + s"-- !query $queryIndex output\n" + output } } + /** A test case. */ + private trait TestCase { + val name: String + val inputFile: String + val resultFile: String + } + + /** A regular test case. */ + private case class RegularTestCase( + name: String, inputFile: String, resultFile: String) extends TestCase + + /** A UDF test case. */ + private case class UDFTestCase( + name: String, inputFile: String, resultFile: String, udf: TestUDF) extends TestCase + private def createScalaTestCase(testCase: TestCase): Unit = { if (blackList.exists(t => testCase.name.toLowerCase(Locale.ROOT).contains(t.toLowerCase(Locale.ROOT)))) { // Create a test case to ignore this case. ignore(testCase.name) { /* Do nothing */ } - } else { - // Create a test case to run this case. - test(testCase.name) { runTest(testCase) } + } else testCase match { + case UDFTestCase(_, _, _, udf: TestPythonUDF) if !shouldTestPythonUDFs => + ignore(s"${testCase.name} is skipped because " + + s"[$pythonExec] and/or pyspark were not available.") { + /* Do nothing */ + } + case UDFTestCase(_, _, _, udf: TestScalarPandasUDF) if !shouldTestScalarPandasUDFs => + ignore(s"${testCase.name} is skipped because pyspark," + + s"pandas and/or pyarrow were not available in [$pythonExec].") { + /* Do nothing */ + } + case _ => + // Create a test case to run this case. + test(testCase.name) { + runTest(testCase) + } } } @@ -166,7 +207,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { // When we are regenerating the golden files, we don't need to set any config as they // all need to return the same result if (regenerateGoldenFiles) { - runQueries(queries, testCase.resultFile, None) + runQueries(queries, testCase, None) } else { val configSets = { val configLines = comments.filter(_.startsWith("--SET")).map(_.substring(5)) @@ -188,7 +229,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { configSets.foreach { configSet => try { - runQueries(queries, testCase.resultFile, Some(configSet)) + runQueries(queries, testCase, Some(configSet)) } catch { case e: Throwable => val configs = configSet.map { @@ -203,12 +244,16 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { private def runQueries( queries: Seq[String], - resultFileName: String, + testCase: TestCase, configSet: Option[Seq[(String, String)]]): Unit = { // Create a local SparkSession to have stronger isolation between different test cases. // This does not isolate catalog changes. val localSparkSession = spark.newSession() loadTestData(localSparkSession) + testCase match { + case udfTestCase: UDFTestCase => registerTestUDF(udfTestCase.udf, localSparkSession) + case _ => // Don't add UDFs in Regular tests. + } if (configSet.isDefined) { // Execute the list of set operation in order to add the desired configs @@ -233,7 +278,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { s"-- Number of queries: ${outputs.size}\n\n\n" + outputs.zipWithIndex.map{case (qr, i) => qr.toString(i)}.mkString("\n\n\n") + "\n" } - val resultFile = new File(resultFileName) + val resultFile = new File(testCase.resultFile) val parent = resultFile.getParentFile if (!parent.exists()) { assert(parent.mkdirs(), "Could not create directory: " + parent) @@ -243,7 +288,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { // Read back the golden file. val expectedOutputs: Seq[QueryOutput] = { - val goldenOutput = fileToString(new File(resultFileName)) + val goldenOutput = fileToString(new File(testCase.resultFile)) val segments = goldenOutput.split("-- !query.+\n") // each query has 3 segments, plus the header @@ -322,11 +367,33 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { } private def listTestCases(): Seq[TestCase] = { - listFilesRecursively(new File(inputFilePath)).map { file => + listFilesRecursively(new File(inputFilePath)).flatMap { file => val resultFile = file.getAbsolutePath.replace(inputFilePath, goldenFilePath) + ".out" val absPath = file.getAbsolutePath val testCaseName = absPath.stripPrefix(inputFilePath).stripPrefix(File.separator) - TestCase(testCaseName, absPath, resultFile) + + if (file.getAbsolutePath.startsWith(s"$inputFilePath${File.separator}udf")) { + Seq( + UDFTestCase( + s"$testCaseName - Scala UDF", + absPath, + resultFile, + TestScalaUDF(name = "udf")), + + UDFTestCase( + s"$testCaseName - Python UDF", + absPath, + resultFile, + TestPythonUDF(name = "udf")), + + UDFTestCase( + s"$testCaseName - Scalar Pandas UDF", + absPath, + resultFile, + TestScalarPandasUDF(name = "udf"))) + } else { + RegularTestCase(testCaseName, absPath, resultFile) :: Nil + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org