This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new abb1f0c [SPARK-35236][SQL] Support archive files as resources for CREATE FUNCTION USING syntax abb1f0c is described below commit abb1f0c5d7e78b06dd5f2bf6856d2baf97f95b10 Author: Kousuke Saruta <saru...@oss.nttdata.com> AuthorDate: Wed Apr 28 10:15:21 2021 +0900 [SPARK-35236][SQL] Support archive files as resources for CREATE FUNCTION USING syntax ### What changes were proposed in this pull request? This PR proposes to make `CREATE FUNCTION USING` syntax can take archives as resources. ### Why are the changes needed? It would be useful. `CREATE FUNCTION USING` syntax doesn't support archives as resources because archives were not supported in Spark SQL. Now Spark SQL supports archives so I think we can support them for the syntax. ### Does this PR introduce _any_ user-facing change? Yes. Users can specify archives for `CREATE FUNCTION USING` syntax. ### How was this patch tested? New test. Closes #32359 from sarutak/load-function-using-archive. Authored-by: Kousuke Saruta <saru...@oss.nttdata.com> Signed-off-by: hyukjinkwon <gurwls...@apache.org> --- .../apache/spark/sql/internal/SessionState.scala | 5 +- .../spark/sql/hive/execution/HiveUDFSuite.scala | 53 +++++++++++++++++++++- 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 319b226..79fbca6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -150,10 +150,7 @@ class SessionResourceLoader(session: SparkSession) extends FunctionResourceLoade resource.resourceType match { case JarResource => addJar(resource.uri) case FileResource => session.sparkContext.addFile(resource.uri) - case ArchiveResource => - throw new AnalysisException( - "Archive is not allowed to be loaded. If YARN mode is used, " + - "please use --archives options while calling spark-submit.") + case ArchiveResource => session.sparkContext.addArchive(resource.uri) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 9e8046b9..f988287 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.hive.execution import java.io.{DataInput, DataOutput, File, PrintWriter} import java.util.{ArrayList, Arrays, Properties} +import scala.collection.JavaConverters._ + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.exec.UDF import org.apache.hadoop.hive.ql.udf.{UDAFPercentile, UDFType} @@ -30,6 +32,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectIns import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory import org.apache.hadoop.io.{LongWritable, Writable} +import org.apache.spark.{SparkFiles, TestUtils} import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.execution.command.FunctionsCommand @@ -676,6 +679,47 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(msg2.contains(s"No handler for UDF/UDAF/UDTF '${classOf[ArraySumUDF].getName}'")) } } + + test("SPARK-35236: CREATE FUNCTION should take an archive in USING clause") { + withTempDir { dir => + withUserDefinedFunction("testListFiles1" -> false) { + val text1 = File.createTempFile("test1_", ".txt", dir) + val json1 = File.createTempFile("test1_", ".json", dir) + val zipFile1 = File.createTempFile("test1_", ".zip", dir) + TestUtils.createJar(Seq(text1, json1), zipFile1) + + sql(s"CREATE FUNCTION testListFiles1 AS '${classOf[ListFiles].getName}' " + + s"USING ARCHIVE '${zipFile1.getAbsolutePath}'") + val df1 = sql(s"SELECT testListFiles1('${SparkFiles.get(zipFile1.getName)}')") + val fileList1 = + df1.collect().map(_.getList[String](0)).head.asScala.filter(_ != "META-INF") + + assert(fileList1.length === 2) + assert(fileList1.contains(text1.getName)) + assert(fileList1.contains(json1.getName)) + } + + // Test for file#alias style archive registration. + withUserDefinedFunction("testListFiles2" -> false) { + val text2 = File.createTempFile("test2_", ".txt", dir) + val json2 = File.createTempFile("test2_", ".json", dir) + val csv2 = File.createTempFile("test2", ".csv", dir) + val zipFile2 = File.createTempFile("test2_", ".zip", dir) + TestUtils.createJar(Seq(text2, json2, csv2), zipFile2) + + sql(s"CREATE FUNCTION testListFiles2 AS '${classOf[ListFiles].getName}' " + + s"USING ARCHIVE '${zipFile2.getAbsolutePath}#foo'") + val df2 = sql(s"SELECT testListFiles2('${SparkFiles.get("foo")}')") + val fileList2 = + df2.collect().map(_.getList[String](0)).head.asScala.filter(_ != "META-INF") + + assert(fileList2.length === 3) + assert(fileList2.contains(text2.getName)) + assert(fileList2.contains(json2.getName)) + assert(fileList2.contains(csv2.getName)) + } + } + } } class TestPair(x: Int, y: Int) extends Writable with Serializable { @@ -761,7 +805,6 @@ class StatelessUDF extends UDF { } class ArraySumUDF extends UDF { - import scala.collection.JavaConverters._ def evaluate(values: java.util.List[java.lang.Double]): java.lang.Double = { var r = 0d for (v <- values.asScala) { @@ -770,3 +813,11 @@ class ArraySumUDF extends UDF { r } } + +class ListFiles extends UDF { + import java.util.{ArrayList, Arrays, List => JList} + def evaluate(path: String): JList[String] = { + val fileArray = new File(path).list() + if (fileArray != null) Arrays.asList(fileArray: _*) else new ArrayList[String]() + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org