This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new edd21069727 [SPARK-43923][CONNECT][FOLLOW-UP] Propagate extra tags to 
SparkListenerConnectOperationFinished
edd21069727 is described below

commit edd210697272c03e3d97a6443a65d0a130353c05
Author: Martin Grund <martin.gr...@databricks.com>
AuthorDate: Wed Aug 30 17:35:09 2023 +0200

    [SPARK-43923][CONNECT][FOLLOW-UP] Propagate extra tags to 
SparkListenerConnectOperationFinished
    
    ### What changes were proposed in this pull request?
    The `SparkListenerConnectOperationFinished` message supports passing extra 
tags, but the event method did not support them yet. This patch propagates the 
extra tags and adds a test for it.
    
    ### Why are the changes needed?
    Compatibility with the message interface.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #42732 from grundprinzip/SPARK-43923.
    
    Authored-by: Martin Grund <martin.gr...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../spark/sql/connect/service/ExecuteEventsManager.scala    |  7 +++++--
 .../sql/connect/service/ExecuteEventsManagerSuite.scala     | 13 +++++++++++++
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
index 23a67b7292b..9e8a945bcc3 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
@@ -205,7 +205,9 @@ case class ExecuteEventsManager(executeHolder: 
ExecuteHolder, clock: Clock) {
    *   Number of rows that are returned to the user. None is expected when the 
operation does not
    *   return any rows.
    */
-  def postFinished(producedRowsCountOpt: Option[Long] = None): Unit = {
+  def postFinished(
+      producedRowsCountOpt: Option[Long] = None,
+      extraTags: Map[String, String] = Map.empty): Unit = {
     assertStatus(
       List(ExecuteStatus.Started, ExecuteStatus.ReadyForExecution),
       ExecuteStatus.Finished)
@@ -217,7 +219,8 @@ case class ExecuteEventsManager(executeHolder: 
ExecuteHolder, clock: Clock) {
           jobTag,
           operationId,
           clock.getTimeMillis(),
-          producedRowCount))
+          producedRowCount,
+          extraTags))
   }
 
   /**
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
index 12e67f2c59c..dbe8420eab0 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
@@ -184,6 +184,19 @@ class ExecuteEventsManagerSuite
           Some(100)))
   }
 
+  test("SPARK-43923: post finished with extra tags") {
+    val events = setupEvents(ExecuteStatus.Started)
+    events.postFinished(Some(100), Map("someEvent" -> "true"))
+    
verify(events.executeHolder.sessionHolder.session.sparkContext.listenerBus, 
times(1))
+      .post(
+        SparkListenerConnectOperationFinished(
+          events.executeHolder.jobTag,
+          DEFAULT_QUERY_ID,
+          DEFAULT_CLOCK.getTimeMillis(),
+          Some(100),
+          Map("someEvent" -> "true")))
+  }
+
   test("SPARK-43923: post closed") {
     val events = setupEvents(ExecuteStatus.Finished)
     events.postClosed()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to