This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 78a6b46b5f8 [SPARK-44794][CONNECT] Make Streaming Queries work with Connect's artifact management 78a6b46b5f8 is described below commit 78a6b46b5f87f7a4b86e1d2ce010069a9027bdf9 Author: Herman van Hovell <her...@databricks.com> AuthorDate: Tue Aug 15 19:02:31 2023 +0200 [SPARK-44794][CONNECT] Make Streaming Queries work with Connect's artifact management ### What changes were proposed in this pull request? When you try to run a streaming query that contains a UDF from the REPL, for example: ```scala val add1 = udf((i: Long) => i + 1) val query = spark.readStream .format("rate") .option("rowsPerSecond", "10") .option("numPartitions", "1") .load() .withColumn("value", add1($"value")) .writeStream .format("memory") .queryName("my_sink") .start() ``` You are currently greeted by a hard to understand deserialization issue, where a serialization proxy cannot be assigned to a field. The underlying cause here is a `ClassNotFoundException` (yes, java serialization is weird). This `ClassNotFoundException` is caused by us not propagating the `JobArtifactState` (this - indirectly - contains information about the location of REPL generated classes, and session local libraries) properly to the streaming query execution thread. This PR fixed this by propagating the `JobArtifactState` into the stream execution thread. ### Why are the changes needed? It is a bug. We want streaming to work with connect's isolated dependencies. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added a test to `ReplE2ESuite`. Closes #42476 from hvanhovell/SPARK-44794. Authored-by: Herman van Hovell <her...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> (cherry picked from commit 2ab404ff179ae04e86d4441fa8c28c08f95e0d0f) Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../spark/sql/application/ReplE2ESuite.scala | 28 ++++++++++++++++++++++ .../sql/execution/streaming/StreamExecution.scala | 9 +++++-- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala index 191fd851482..13ca5caf0af 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala @@ -288,4 +288,32 @@ class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach { val output = runCommandsInShell(input) assertContains("Array[MyTestClass] = Array(MyTestClass(0), MyTestClass(1))", output) } + + test("streaming works with REPL generated code") { + val input = + """ + |val add1 = udf((i: Long) => i + 1) + |val query = { + | spark.readStream + | .format("rate") + | .option("rowsPerSecond", "10") + | .option("numPartitions", "1") + | .load() + | .withColumn("value", add1($"value")) + | .writeStream + | .format("memory") + | .queryName("my_sink") + | .start() + |} + |var progress = query.lastProgress + |while (query.isActive && (progress == null || progress.numInputRows == 0)) { + | query.awaitTermination(100) + | progress = query.lastProgress + |} + |val noException = query.exception.isEmpty + |query.stop() + |""".stripMargin + val output = runCommandsInShell(input) + assertContains("noException: Boolean = true", output) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 88fb28f0da4..936de41af76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -31,7 +31,7 @@ import scala.util.control.NonFatal import com.google.common.util.concurrent.UncheckedExecutionException import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkContext, SparkException, SparkThrowable} +import org.apache.spark.{JobArtifactSet, SparkContext, SparkException, SparkThrowable} import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -186,6 +186,9 @@ abstract class StreamExecution( /* Get the call site in the caller thread; will pass this into the micro batch thread */ private val callSite = Utils.getCallSite() + /* Make sure we propagate the ArtifactSet to the micro batch thread. */ + private val jobArtifactState = JobArtifactSet.getCurrentJobArtifactState.orNull + /** Used to report metrics to coda-hale. This uses id for easier tracking across restarts. */ lazy val streamMetrics = new MetricsReporter( this, s"spark.streaming.${Option(name).getOrElse(id)}") @@ -204,7 +207,9 @@ abstract class StreamExecution( // To fix call site like "run at <unknown>:0", we bridge the call site from the caller // thread to this micro batch thread sparkSession.sparkContext.setCallSite(callSite) - runStream() + JobArtifactSet.withActiveJobArtifactState(jobArtifactState) { + runStream() + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org