This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new edd21069727 [SPARK-43923][CONNECT][FOLLOW-UP] Propagate extra tags to SparkListenerConnectOperationFinished edd21069727 is described below commit edd210697272c03e3d97a6443a65d0a130353c05 Author: Martin Grund <martin.gr...@databricks.com> AuthorDate: Wed Aug 30 17:35:09 2023 +0200 [SPARK-43923][CONNECT][FOLLOW-UP] Propagate extra tags to SparkListenerConnectOperationFinished ### What changes were proposed in this pull request? The `SparkListenerConnectOperationFinished` message supports passing extra tags, but the event method did not support them yet. This patch propagates the extra tags and adds a test for it. ### Why are the changes needed? Compatibility with the message interface. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT ### Was this patch authored or co-authored using generative AI tooling? No Closes #42732 from grundprinzip/SPARK-43923. Authored-by: Martin Grund <martin.gr...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../spark/sql/connect/service/ExecuteEventsManager.scala | 7 +++++-- .../sql/connect/service/ExecuteEventsManagerSuite.scala | 13 +++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala index 23a67b7292b..9e8a945bcc3 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala @@ -205,7 +205,9 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) { * Number of rows that are returned to the user. None is expected when the operation does not * return any rows. */ - def postFinished(producedRowsCountOpt: Option[Long] = None): Unit = { + def postFinished( + producedRowsCountOpt: Option[Long] = None, + extraTags: Map[String, String] = Map.empty): Unit = { assertStatus( List(ExecuteStatus.Started, ExecuteStatus.ReadyForExecution), ExecuteStatus.Finished) @@ -217,7 +219,8 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) { jobTag, operationId, clock.getTimeMillis(), - producedRowCount)) + producedRowCount, + extraTags)) } /** diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala index 12e67f2c59c..dbe8420eab0 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala @@ -184,6 +184,19 @@ class ExecuteEventsManagerSuite Some(100))) } + test("SPARK-43923: post finished with extra tags") { + val events = setupEvents(ExecuteStatus.Started) + events.postFinished(Some(100), Map("someEvent" -> "true")) + verify(events.executeHolder.sessionHolder.session.sparkContext.listenerBus, times(1)) + .post( + SparkListenerConnectOperationFinished( + events.executeHolder.jobTag, + DEFAULT_QUERY_ID, + DEFAULT_CLOCK.getTimeMillis(), + Some(100), + Map("someEvent" -> "true"))) + } + test("SPARK-43923: post closed") { val events = setupEvents(ExecuteStatus.Finished) events.postClosed() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org