LuciferYang commented on code in PR #42069: URL: https://github.com/apache/spark/pull/42069#discussion_r1273081014
########## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala: ########## @@ -211,6 +199,23 @@ trait RemoteSparkSession extends ConnectFunSuite with BeforeAndAfterAll { throw error } } + + addClientTestArtifactInServerClasspath(spark) Review Comment: should move into `if (SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) {` block, ########## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UDFClassLoadingE2ESuite.scala: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import java.util.Arrays + +import org.apache.spark.sql.connect.client.SparkResult +import org.apache.spark.sql.connect.client.util.RemoteSparkSession +import org.apache.spark.sql.types.{StringType, StructField, StructType} + +class UDFClassLoadingE2ESuite extends RemoteSparkSession { + + test("load udf with default stub class loader") { + val rows = spark.range(10).filter(n => n % 2 == 0).collectAsList() + assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8)) + } + + test("update class loader after stubbing: new session") { + // Session1 uses Stub SparkResult class + val session1 = spark.newSession() Review Comment: Do we have to use different SparkSessions in order to avoid failures caused by duplicate Artifact additions? ########## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UDFClassLoadingE2ESuite.scala: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import java.util.Arrays + +import org.apache.spark.sql.connect.client.SparkResult +import org.apache.spark.sql.connect.client.util.RemoteSparkSession +import org.apache.spark.sql.types.{StringType, StructField, StructType} + +class UDFClassLoadingE2ESuite extends RemoteSparkSession { + + test("load udf with default stub class loader") { + val rows = spark.range(10).filter(n => n % 2 == 0).collectAsList() + assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8)) + } + + test("update class loader after stubbing: new session") { + // Session1 uses Stub SparkResult class + val session1 = spark.newSession() + addClientTestArtifactInServerClasspath(session1) + val ds = session1.range(10).filter(n => n % 2 == 0) + + val rows = ds.collectAsList() + assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8)) + + // Session2 uses the real SparkResult class + val session2 = spark.newSession() + addClientTestArtifactInServerClasspath(session2) + addClientTestArtifactInServerClasspath(session2, testJar = false) + val rows2 = session2 + .range(10) + .filter(n => { + // Try to use spark result + new SparkResult[Int](null, null, null) Review Comment: should pass `timeZoneId` ########## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/IntegrationTestUtils.scala: ########## @@ -30,7 +30,7 @@ object IntegrationTestUtils { // System properties used for testing and debugging private val DEBUG_SC_JVM_CLIENT = "spark.debug.sc.jvm.client" // Enable this flag to print all client debug log + server logs to the console - private[connect] val isDebug = System.getProperty(DEBUG_SC_JVM_CLIENT, "false").toBoolean + private[connect] val isDebug = System.getProperty(DEBUG_SC_JVM_CLIENT, "true").toBoolean Review Comment: Will this change be reversed in the end? ########## core/src/main/scala/org/apache/spark/internal/config/package.scala: ########## @@ -2547,4 +2547,18 @@ package object config { .version("3.5.0") .booleanConf .createWithDefault(false) + + private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES = + ConfigBuilder("spark.connect.scalaUdf.stubClasses") + .internal() + .doc(""" + |Comma separated list of binary names of classes/packages that should be stub during the + |Scala UDF serdeser and execution if not found on the server classpath. Review Comment: ```suggestion |Scala UDF serde and execution if not found on the server classpath. ``` ########## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UDFClassLoadingE2ESuite.scala: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import java.util.Arrays + +import org.apache.spark.sql.connect.client.SparkResult +import org.apache.spark.sql.connect.client.util.RemoteSparkSession +import org.apache.spark.sql.types.{StringType, StructField, StructType} + +class UDFClassLoadingE2ESuite extends RemoteSparkSession { + + test("load udf with default stub class loader") { + val rows = spark.range(10).filter(n => n % 2 == 0).collectAsList() + assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8)) + } + + test("update class loader after stubbing: new session") { Review Comment: locally run `build/sbt clean "connect-client-jvm/testOnly *UDFClassLoadingE2ESuite"` ``` [info] - update class loader after stubbing: new session *** FAILED *** (148 milliseconds) [info] java.io.NotSerializableException: org.scalatest.Engine ``` ########## core/src/main/scala/org/apache/spark/internal/config/package.scala: ########## @@ -2547,4 +2547,18 @@ package object config { .version("3.5.0") .booleanConf .createWithDefault(false) + + private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES = + ConfigBuilder("spark.connect.scalaUdf.stubClasses") + .internal() + .doc(""" + |Comma separated list of binary names of classes/packages that should be stub during the + |Scala UDF serdeser and execution if not found on the server classpath. + |An empty list effectively disables stubbing for all missing classes. + |By default the server stubs classes from the Scala client package. Review Comment: ```suggestion |By default, the server stubs classes from the Scala client package. ``` ########## core/src/main/scala/org/apache/spark/internal/config/package.scala: ########## @@ -2547,4 +2547,18 @@ package object config { .version("3.5.0") .booleanConf .createWithDefault(false) + + private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES = + ConfigBuilder("spark.connect.scalaUdf.stubClasses") + .internal() + .doc(""" + |Comma separated list of binary names of classes/packages that should be stub during the Review Comment: ```suggestion |Comma-separated list of binary names of classes/packages that should be stubbed during the ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org