This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new abb1f0c  [SPARK-35236][SQL] Support archive files as resources for 
CREATE FUNCTION USING syntax
abb1f0c is described below

commit abb1f0c5d7e78b06dd5f2bf6856d2baf97f95b10
Author: Kousuke Saruta <saru...@oss.nttdata.com>
AuthorDate: Wed Apr 28 10:15:21 2021 +0900

    [SPARK-35236][SQL] Support archive files as resources for CREATE FUNCTION 
USING syntax
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to make `CREATE FUNCTION USING` syntax can take archives 
as resources.
    
    ### Why are the changes needed?
    
    It would be useful.
    `CREATE FUNCTION USING` syntax doesn't support archives as resources 
because archives were not supported in Spark SQL.
    Now Spark SQL supports archives so I think we can support them for the 
syntax.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Users can specify archives for `CREATE FUNCTION USING` syntax.
    
    ### How was this patch tested?
    
    New test.
    
    Closes #32359 from sarutak/load-function-using-archive.
    
    Authored-by: Kousuke Saruta <saru...@oss.nttdata.com>
    Signed-off-by: hyukjinkwon <gurwls...@apache.org>
---
 .../apache/spark/sql/internal/SessionState.scala   |  5 +-
 .../spark/sql/hive/execution/HiveUDFSuite.scala    | 53 +++++++++++++++++++++-
 2 files changed, 53 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 319b226..79fbca6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -150,10 +150,7 @@ class SessionResourceLoader(session: SparkSession) extends 
FunctionResourceLoade
     resource.resourceType match {
       case JarResource => addJar(resource.uri)
       case FileResource => session.sparkContext.addFile(resource.uri)
-      case ArchiveResource =>
-        throw new AnalysisException(
-          "Archive is not allowed to be loaded. If YARN mode is used, " +
-            "please use --archives options while calling spark-submit.")
+      case ArchiveResource => session.sparkContext.addArchive(resource.uri)
     }
   }
 
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 9e8046b9..f988287 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.hive.execution
 import java.io.{DataInput, DataOutput, File, PrintWriter}
 import java.util.{ArrayList, Arrays, Properties}
 
+import scala.collection.JavaConverters._
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hive.ql.exec.UDF
 import org.apache.hadoop.hive.ql.udf.{UDAFPercentile, UDFType}
@@ -30,6 +32,7 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectIns
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
 import org.apache.hadoop.io.{LongWritable, Writable}
 
+import org.apache.spark.{SparkFiles, TestUtils}
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.execution.command.FunctionsCommand
@@ -676,6 +679,47 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
       assert(msg2.contains(s"No handler for UDF/UDAF/UDTF 
'${classOf[ArraySumUDF].getName}'"))
     }
   }
+
+  test("SPARK-35236: CREATE FUNCTION should take an archive in USING clause") {
+    withTempDir { dir =>
+      withUserDefinedFunction("testListFiles1" -> false) {
+        val text1 = File.createTempFile("test1_", ".txt", dir)
+        val json1 = File.createTempFile("test1_", ".json", dir)
+        val zipFile1 = File.createTempFile("test1_", ".zip", dir)
+        TestUtils.createJar(Seq(text1, json1), zipFile1)
+
+        sql(s"CREATE FUNCTION testListFiles1 AS 
'${classOf[ListFiles].getName}' " +
+          s"USING ARCHIVE '${zipFile1.getAbsolutePath}'")
+        val df1 = sql(s"SELECT 
testListFiles1('${SparkFiles.get(zipFile1.getName)}')")
+        val fileList1 =
+          df1.collect().map(_.getList[String](0)).head.asScala.filter(_ != 
"META-INF")
+
+        assert(fileList1.length === 2)
+        assert(fileList1.contains(text1.getName))
+        assert(fileList1.contains(json1.getName))
+      }
+
+      // Test for file#alias style archive registration.
+      withUserDefinedFunction("testListFiles2" -> false) {
+        val text2 = File.createTempFile("test2_", ".txt", dir)
+        val json2 = File.createTempFile("test2_", ".json", dir)
+        val csv2 = File.createTempFile("test2", ".csv", dir)
+        val zipFile2 = File.createTempFile("test2_", ".zip", dir)
+        TestUtils.createJar(Seq(text2, json2, csv2), zipFile2)
+
+        sql(s"CREATE FUNCTION testListFiles2 AS 
'${classOf[ListFiles].getName}' " +
+          s"USING ARCHIVE '${zipFile2.getAbsolutePath}#foo'")
+        val df2 = sql(s"SELECT testListFiles2('${SparkFiles.get("foo")}')")
+        val fileList2 =
+          df2.collect().map(_.getList[String](0)).head.asScala.filter(_ != 
"META-INF")
+
+        assert(fileList2.length === 3)
+        assert(fileList2.contains(text2.getName))
+        assert(fileList2.contains(json2.getName))
+        assert(fileList2.contains(csv2.getName))
+      }
+    }
+  }
 }
 
 class TestPair(x: Int, y: Int) extends Writable with Serializable {
@@ -761,7 +805,6 @@ class StatelessUDF extends UDF {
 }
 
 class ArraySumUDF extends UDF {
-  import scala.collection.JavaConverters._
   def evaluate(values: java.util.List[java.lang.Double]): java.lang.Double = {
     var r = 0d
     for (v <- values.asScala) {
@@ -770,3 +813,11 @@ class ArraySumUDF extends UDF {
     r
   }
 }
+
+class ListFiles extends UDF {
+  import java.util.{ArrayList, Arrays, List => JList}
+  def evaluate(path: String): JList[String] = {
+    val fileArray = new File(path).list()
+    if (fileArray != null) Arrays.asList(fileArray: _*) else new 
ArrayList[String]()
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to