This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 78a6b46b5f8 [SPARK-44794][CONNECT] Make Streaming Queries work with 
Connect's artifact management
78a6b46b5f8 is described below

commit 78a6b46b5f87f7a4b86e1d2ce010069a9027bdf9
Author: Herman van Hovell <her...@databricks.com>
AuthorDate: Tue Aug 15 19:02:31 2023 +0200

    [SPARK-44794][CONNECT] Make Streaming Queries work with Connect's artifact 
management
    
    ### What changes were proposed in this pull request?
    When you try to run a streaming query that contains a UDF from the REPL, 
for example:
    ```scala
    val add1 = udf((i: Long) => i + 1)
    val query = spark.readStream
        .format("rate")
        .option("rowsPerSecond", "10")
        .option("numPartitions", "1")
        .load()
        .withColumn("value", add1($"value"))
        .writeStream
        .format("memory")
        .queryName("my_sink")
        .start()
    ```
    You are currently greeted by a hard to understand deserialization issue, 
where a serialization proxy cannot be assigned to a field. The underlying cause 
here is a `ClassNotFoundException` (yes, java serialization is weird). This  
`ClassNotFoundException`  is caused by us not propagating the 
`JobArtifactState` (this - indirectly - contains information about the location 
of REPL generated classes, and session local libraries) properly to the 
streaming query execution thread.
    
    This PR fixed this by propagating the `JobArtifactState` into the stream 
execution thread.
    
    ### Why are the changes needed?
    It is a bug. We want streaming to work with connect's isolated dependencies.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    I added a test to `ReplE2ESuite`.
    
    Closes #42476 from hvanhovell/SPARK-44794.
    
    Authored-by: Herman van Hovell <her...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
    (cherry picked from commit 2ab404ff179ae04e86d4441fa8c28c08f95e0d0f)
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../spark/sql/application/ReplE2ESuite.scala       | 28 ++++++++++++++++++++++
 .../sql/execution/streaming/StreamExecution.scala  |  9 +++++--
 2 files changed, 35 insertions(+), 2 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
index 191fd851482..13ca5caf0af 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
@@ -288,4 +288,32 @@ class ReplE2ESuite extends RemoteSparkSession with 
BeforeAndAfterEach {
     val output = runCommandsInShell(input)
     assertContains("Array[MyTestClass] = Array(MyTestClass(0), 
MyTestClass(1))", output)
   }
+
+  test("streaming works with REPL generated code") {
+    val input =
+      """
+        |val add1 = udf((i: Long) => i + 1)
+        |val query = {
+        |  spark.readStream
+        |      .format("rate")
+        |      .option("rowsPerSecond", "10")
+        |      .option("numPartitions", "1")
+        |      .load()
+        |      .withColumn("value", add1($"value"))
+        |      .writeStream
+        |      .format("memory")
+        |      .queryName("my_sink")
+        |      .start()
+        |}
+        |var progress = query.lastProgress
+        |while (query.isActive && (progress == null || progress.numInputRows 
== 0)) {
+        |  query.awaitTermination(100)
+        |  progress = query.lastProgress
+        |}
+        |val noException = query.exception.isEmpty
+        |query.stop()
+        |""".stripMargin
+    val output = runCommandsInShell(input)
+    assertContains("noException: Boolean = true", output)
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 88fb28f0da4..936de41af76 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -31,7 +31,7 @@ import scala.util.control.NonFatal
 import com.google.common.util.concurrent.UncheckedExecutionException
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.{SparkContext, SparkException, SparkThrowable}
+import org.apache.spark.{JobArtifactSet, SparkContext, SparkException, 
SparkThrowable}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -186,6 +186,9 @@ abstract class StreamExecution(
   /* Get the call site in the caller thread; will pass this into the micro 
batch thread */
   private val callSite = Utils.getCallSite()
 
+  /* Make sure we propagate the ArtifactSet to the micro batch thread. */
+  private val jobArtifactState = 
JobArtifactSet.getCurrentJobArtifactState.orNull
+
   /** Used to report metrics to coda-hale. This uses id for easier tracking 
across restarts. */
   lazy val streamMetrics = new MetricsReporter(
     this, s"spark.streaming.${Option(name).getOrElse(id)}")
@@ -204,7 +207,9 @@ abstract class StreamExecution(
         // To fix call site like "run at <unknown>:0", we bridge the call site 
from the caller
         // thread to this micro batch thread
         sparkSession.sparkContext.setCallSite(callSite)
-        runStream()
+        JobArtifactSet.withActiveJobArtifactState(jobArtifactState) {
+          runStream()
+        }
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to