LuciferYang commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1273081014


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala:
##########
@@ -211,6 +199,23 @@ trait RemoteSparkSession extends ConnectFunSuite with 
BeforeAndAfterAll {
         throw error
       }
     }
+
+    addClientTestArtifactInServerClasspath(spark)

Review Comment:
   should move into `if (SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) 
{`  block, 



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UDFClassLoadingE2ESuite.scala:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.util.Arrays
+
+import org.apache.spark.sql.connect.client.SparkResult
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+class UDFClassLoadingE2ESuite extends RemoteSparkSession {
+
+  test("load udf with default stub class loader") {
+    val rows = spark.range(10).filter(n => n % 2 == 0).collectAsList()
+    assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8))
+  }
+
+  test("update class loader after stubbing: new session") {
+    // Session1 uses Stub SparkResult class
+    val session1 = spark.newSession()

Review Comment:
   Do we have to use different SparkSessions in order to avoid failures caused 
by duplicate Artifact additions?



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UDFClassLoadingE2ESuite.scala:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.util.Arrays
+
+import org.apache.spark.sql.connect.client.SparkResult
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+class UDFClassLoadingE2ESuite extends RemoteSparkSession {
+
+  test("load udf with default stub class loader") {
+    val rows = spark.range(10).filter(n => n % 2 == 0).collectAsList()
+    assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8))
+  }
+
+  test("update class loader after stubbing: new session") {
+    // Session1 uses Stub SparkResult class
+    val session1 = spark.newSession()
+    addClientTestArtifactInServerClasspath(session1)
+    val ds = session1.range(10).filter(n => n % 2 == 0)
+
+    val rows = ds.collectAsList()
+    assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8))
+
+    // Session2 uses the real SparkResult class
+    val session2 = spark.newSession()
+    addClientTestArtifactInServerClasspath(session2)
+    addClientTestArtifactInServerClasspath(session2, testJar = false)
+    val rows2 = session2
+      .range(10)
+      .filter(n => {
+        // Try to use spark result
+        new SparkResult[Int](null, null, null)

Review Comment:
   should pass `timeZoneId`



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/IntegrationTestUtils.scala:
##########
@@ -30,7 +30,7 @@ object IntegrationTestUtils {
   // System properties used for testing and debugging
   private val DEBUG_SC_JVM_CLIENT = "spark.debug.sc.jvm.client"
   // Enable this flag to print all client debug log + server logs to the 
console
-  private[connect] val isDebug = System.getProperty(DEBUG_SC_JVM_CLIENT, 
"false").toBoolean
+  private[connect] val isDebug = System.getProperty(DEBUG_SC_JVM_CLIENT, 
"true").toBoolean

Review Comment:
   Will this change be reversed in the end?
   
   



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2547,4 +2547,18 @@ package object config {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
+    ConfigBuilder("spark.connect.scalaUdf.stubClasses")
+      .internal()
+      .doc("""
+          |Comma separated list of binary names of classes/packages that 
should be stub during the
+          |Scala UDF serdeser and execution if not found on the server 
classpath.

Review Comment:
   ```suggestion
             |Scala UDF serde and execution if not found on the server 
classpath.
   ```



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UDFClassLoadingE2ESuite.scala:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.util.Arrays
+
+import org.apache.spark.sql.connect.client.SparkResult
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+class UDFClassLoadingE2ESuite extends RemoteSparkSession {
+
+  test("load udf with default stub class loader") {
+    val rows = spark.range(10).filter(n => n % 2 == 0).collectAsList()
+    assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8))
+  }
+
+  test("update class loader after stubbing: new session") {

Review Comment:
   locally run `build/sbt clean "connect-client-jvm/testOnly 
*UDFClassLoadingE2ESuite"`
   
   ```
   [info] - update class loader after stubbing: new session *** FAILED *** (148 
milliseconds)
   [info]   java.io.NotSerializableException: org.scalatest.Engine
   
   ```



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2547,4 +2547,18 @@ package object config {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
+    ConfigBuilder("spark.connect.scalaUdf.stubClasses")
+      .internal()
+      .doc("""
+          |Comma separated list of binary names of classes/packages that 
should be stub during the
+          |Scala UDF serdeser and execution if not found on the server 
classpath.
+          |An empty list effectively disables stubbing for all missing classes.
+          |By default the server stubs classes from the Scala client package.

Review Comment:
   ```suggestion
             |By default, the server stubs classes from the Scala client 
package.
   ```



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2547,4 +2547,18 @@ package object config {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
+    ConfigBuilder("spark.connect.scalaUdf.stubClasses")
+      .internal()
+      .doc("""
+          |Comma separated list of binary names of classes/packages that 
should be stub during the

Review Comment:
   ```suggestion
             |Comma-separated list of binary names of classes/packages that 
should be stubbed during the
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to