This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new a7941f15a0c [SPARK-44861][CONNECT] jsonignore SparkListenerConnectOperationStarted.planRequest a7941f15a0c is described below commit a7941f15a0c3034888b1adbd5affce2a9e12788e Author: jdesjean <jf.gauth...@databricks.com> AuthorDate: Wed Aug 23 18:39:49 2023 +0200 [SPARK-44861][CONNECT] jsonignore SparkListenerConnectOperationStarted.planRequest ### What changes were proposed in this pull request? Add `JsonIgnore` to `SparkListenerConnectOperationStarted.planRequest` ### Why are the changes needed? `SparkListenerConnectOperationStarted` was added as part of [SPARK-43923](https://issues.apache.org/jira/browse/SPARK-43923). `SparkListenerConnectOperationStarted.planRequest` cannot be serialized & deserialized from json as it has recursive objects which causes failures when attempting these operations. ``` com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Direct self-reference leading to cycle (through reference chain: org.apache.spark.sql.connect.service.SparkListenerConnectOperationStarted["planRequest"]->org.apache.spark.connect.proto.ExecutePlanRequest["unknownFields"]->grpc_shaded.com.google.protobuf.UnknownFieldSet["defaultInstanceForType"]) at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77) at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1308) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit Closes #42550 from jdesjean/SPARK-44861. Authored-by: jdesjean <jf.gauth...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> (cherry picked from commit dd6cda5b614b4ede418afb4c5b1fdeea9613d32c) Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../sql/connect/service/ExecuteEventsManager.scala | 37 ++++--- .../service/ExecuteEventsManagerSuite.scala | 114 ++++++++++++++------- .../ui/SparkConnectServerListenerSuite.scala | 3 - .../connect/ui/SparkConnectServerPageSuite.scala | 1 - 4 files changed, 95 insertions(+), 60 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala index 5b9267a9679..23a67b7292b 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala @@ -119,19 +119,19 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) { s"${request.getPlan.getOpTypeCase} not supported.") } - listenerBus.post( - SparkListenerConnectOperationStarted( - jobTag, - operationId, - clock.getTimeMillis(), - sessionId, - request.getUserContext.getUserId, - request.getUserContext.getUserName, - Utils.redact( - sessionHolder.session.sessionState.conf.stringRedactionPattern, - ProtoUtils.abbreviate(plan, ExecuteEventsManager.MAX_STATEMENT_TEXT_SIZE).toString), - Some(request), - sparkSessionTags)) + val event = SparkListenerConnectOperationStarted( + jobTag, + operationId, + clock.getTimeMillis(), + sessionId, + request.getUserContext.getUserId, + request.getUserContext.getUserName, + Utils.redact( + sessionHolder.session.sessionState.conf.stringRedactionPattern, + ProtoUtils.abbreviate(plan, ExecuteEventsManager.MAX_STATEMENT_TEXT_SIZE).toString), + sparkSessionTags) + event.planRequest = Some(request) + listenerBus.post(event) } /** @@ -290,8 +290,6 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) { * Opaque userName set in the Connect request. * @param statementText: * The connect request plan converted to text. - * @param planRequest: - * The Connect request. None if the operation is not of type @link proto.ExecutePlanRequest * @param sparkSessionTags: * Extra tags set by the user (via SparkSession.addTag). * @param extraTags: @@ -305,10 +303,15 @@ case class SparkListenerConnectOperationStarted( userId: String, userName: String, statementText: String, - planRequest: Option[proto.ExecutePlanRequest], sparkSessionTags: Set[String], extraTags: Map[String, String] = Map.empty) - extends SparkListenerEvent + extends SparkListenerEvent { + + /** + * The Connect request. None if the operation is not of type @link proto.ExecutePlanRequest + */ + @JsonIgnore var planRequest: Option[proto.ExecutePlanRequest] = None +} /** * The event is sent after a Connect request has been analyzed (@link diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala index 7950f9c5474..12e67f2c59c 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connect.planner.SparkConnectPlanTest import org.apache.spark.sql.internal.{SessionState, SQLConf} -import org.apache.spark.util.ManualClock +import org.apache.spark.util.{JsonProtocol, ManualClock} class ExecuteEventsManagerSuite extends SparkFunSuite @@ -55,19 +55,25 @@ class ExecuteEventsManagerSuite test("SPARK-43923: post started") { val events = setupEvents(ExecuteStatus.Pending) events.postStarted() + val expectedEvent = SparkListenerConnectOperationStarted( + events.executeHolder.jobTag, + DEFAULT_QUERY_ID, + DEFAULT_CLOCK.getTimeMillis(), + DEFAULT_SESSION_ID, + DEFAULT_USER_ID, + DEFAULT_USER_NAME, + DEFAULT_TEXT, + Set.empty, + Map.empty) + expectedEvent.planRequest = Some(events.executeHolder.request) verify(events.executeHolder.sessionHolder.session.sparkContext.listenerBus, times(1)) - .post(SparkListenerConnectOperationStarted( - events.executeHolder.jobTag, - DEFAULT_QUERY_ID, - DEFAULT_CLOCK.getTimeMillis(), - DEFAULT_SESSION_ID, - DEFAULT_USER_ID, - DEFAULT_USER_NAME, - DEFAULT_TEXT, - Some(events.executeHolder.request), - Set.empty, - Map.empty)) + .post(expectedEvent) + + assert( + JsonProtocol + .sparkEventFromJson(JsonProtocol.sparkEventToJsonString(expectedEvent)) + .isInstanceOf[SparkListenerConnectOperationStarted]) } test("SPARK-43923: post analyzed with plan") { @@ -75,13 +81,18 @@ class ExecuteEventsManagerSuite val mockPlan = mock[LogicalPlan] events.postAnalyzed(Some(mockPlan)) - val event = SparkListenerConnectOperationAnalyzed( + val expectedEvent = SparkListenerConnectOperationAnalyzed( events.executeHolder.jobTag, DEFAULT_QUERY_ID, DEFAULT_CLOCK.getTimeMillis()) - event.analyzedPlan = Some(mockPlan) + expectedEvent.analyzedPlan = Some(mockPlan) verify(events.executeHolder.sessionHolder.session.sparkContext.listenerBus, times(1)) - .post(event) + .post(expectedEvent) + + assert( + JsonProtocol + .sparkEventFromJson(JsonProtocol.sparkEventToJsonString(expectedEvent)) + .isInstanceOf[SparkListenerConnectOperationAnalyzed]) } test("SPARK-43923: post analyzed with empty plan") { @@ -98,47 +109,67 @@ class ExecuteEventsManagerSuite test("SPARK-43923: post readyForExecution") { val events = setupEvents(ExecuteStatus.Analyzed) events.postReadyForExecution() - val event = SparkListenerConnectOperationReadyForExecution( + val expectedEvent = SparkListenerConnectOperationReadyForExecution( events.executeHolder.jobTag, DEFAULT_QUERY_ID, DEFAULT_CLOCK.getTimeMillis()) verify(events.executeHolder.sessionHolder.session.sparkContext.listenerBus, times(1)) - .post(event) + .post(expectedEvent) + + assert( + JsonProtocol + .sparkEventFromJson(JsonProtocol.sparkEventToJsonString(expectedEvent)) + .isInstanceOf[SparkListenerConnectOperationReadyForExecution]) } test("SPARK-43923: post canceled") { val events = setupEvents(ExecuteStatus.Started) events.postCanceled() + val expectedEvent = SparkListenerConnectOperationCanceled( + events.executeHolder.jobTag, + DEFAULT_QUERY_ID, + DEFAULT_CLOCK.getTimeMillis()) verify(events.executeHolder.sessionHolder.session.sparkContext.listenerBus, times(1)) - .post( - SparkListenerConnectOperationCanceled( - events.executeHolder.jobTag, - DEFAULT_QUERY_ID, - DEFAULT_CLOCK.getTimeMillis())) + .post(expectedEvent) + + assert( + JsonProtocol + .sparkEventFromJson(JsonProtocol.sparkEventToJsonString(expectedEvent)) + .isInstanceOf[SparkListenerConnectOperationCanceled]) } test("SPARK-43923: post failed") { val events = setupEvents(ExecuteStatus.Started) events.postFailed(DEFAULT_ERROR) + val expectedEvent = SparkListenerConnectOperationFailed( + events.executeHolder.jobTag, + DEFAULT_QUERY_ID, + DEFAULT_CLOCK.getTimeMillis(), + DEFAULT_ERROR, + Map.empty[String, String]) verify(events.executeHolder.sessionHolder.session.sparkContext.listenerBus, times(1)) - .post( - SparkListenerConnectOperationFailed( - events.executeHolder.jobTag, - DEFAULT_QUERY_ID, - DEFAULT_CLOCK.getTimeMillis(), - DEFAULT_ERROR, - Map.empty[String, String])) + .post(expectedEvent) + + assert( + JsonProtocol + .sparkEventFromJson(JsonProtocol.sparkEventToJsonString(expectedEvent)) + .isInstanceOf[SparkListenerConnectOperationFailed]) } test("SPARK-43923: post finished") { val events = setupEvents(ExecuteStatus.Started) events.postFinished() + val expectedEvent = SparkListenerConnectOperationFinished( + events.executeHolder.jobTag, + DEFAULT_QUERY_ID, + DEFAULT_CLOCK.getTimeMillis()) verify(events.executeHolder.sessionHolder.session.sparkContext.listenerBus, times(1)) - .post( - SparkListenerConnectOperationFinished( - events.executeHolder.jobTag, - DEFAULT_QUERY_ID, - DEFAULT_CLOCK.getTimeMillis())) + .post(expectedEvent) + + assert( + JsonProtocol + .sparkEventFromJson(JsonProtocol.sparkEventToJsonString(expectedEvent)) + .isInstanceOf[SparkListenerConnectOperationFinished]) } test("SPARK-44776: post finished with row number") { @@ -156,12 +187,17 @@ class ExecuteEventsManagerSuite test("SPARK-43923: post closed") { val events = setupEvents(ExecuteStatus.Finished) events.postClosed() + val expectedEvent = SparkListenerConnectOperationClosed( + events.executeHolder.jobTag, + DEFAULT_QUERY_ID, + DEFAULT_CLOCK.getTimeMillis()) verify(events.executeHolder.sessionHolder.session.sparkContext.listenerBus, times(1)) - .post( - SparkListenerConnectOperationClosed( - events.executeHolder.jobTag, - DEFAULT_QUERY_ID, - DEFAULT_CLOCK.getTimeMillis())) + .post(expectedEvent) + + assert( + JsonProtocol + .sparkEventFromJson(JsonProtocol.sparkEventToJsonString(expectedEvent)) + .isInstanceOf[SparkListenerConnectOperationClosed]) } test("SPARK-43923: Closed wrong order throws exception") { diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala index 7cdc0135201..3b75c37b2aa 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala @@ -61,7 +61,6 @@ class SparkConnectServerListenerSuite "userId", "userName", "dummy query", - None, Set())) listener.onOtherEvent( SparkListenerConnectOperationAnalyzed(jobTag, "operationId", System.currentTimeMillis())) @@ -153,7 +152,6 @@ class SparkConnectServerListenerSuite "userId", "userName", "dummy query", - None, Set())) listener.onOtherEvent( SparkListenerConnectOperationAnalyzed(jobTag, "operationId", System.currentTimeMillis())) @@ -199,7 +197,6 @@ class SparkConnectServerListenerSuite "userId", "userName", "dummy query", - None, Set())) listener.onOtherEvent( SparkListenerConnectOperationAnalyzed( diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala index 99d0a14f1e8..d352f4e2c32 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala @@ -65,7 +65,6 @@ class SparkConnectServerPageSuite "userId", "userName", "dummy query", - None, Set())) listener.onOtherEvent( SparkListenerConnectOperationAnalyzed("jobTag", "dummy plan", System.currentTimeMillis())) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org