This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dd6cda5b614 [SPARK-44861][CONNECT] jsonignore 
SparkListenerConnectOperationStarted.planRequest
dd6cda5b614 is described below

commit dd6cda5b614b4ede418afb4c5b1fdeea9613d32c
Author: jdesjean <jf.gauth...@databricks.com>
AuthorDate: Wed Aug 23 18:39:49 2023 +0200

    [SPARK-44861][CONNECT] jsonignore 
SparkListenerConnectOperationStarted.planRequest
    
    ### What changes were proposed in this pull request?
    Add `JsonIgnore` to `SparkListenerConnectOperationStarted.planRequest`
    
    ### Why are the changes needed?
    `SparkListenerConnectOperationStarted` was added as part of 
[SPARK-43923](https://issues.apache.org/jira/browse/SPARK-43923).
    `SparkListenerConnectOperationStarted.planRequest` cannot be serialized & 
deserialized from json as it has recursive objects which causes failures when 
attempting these operations.
    ```
    com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Direct 
self-reference leading to cycle (through reference chain: 
org.apache.spark.sql.connect.service.SparkListenerConnectOperationStarted["planRequest"]->org.apache.spark.connect.proto.ExecutePlanRequest["unknownFields"]->grpc_shaded.com.google.protobuf.UnknownFieldSet["defaultInstanceForType"])
            at 
com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
            at 
com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1308)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit
    
    Closes #42550 from jdesjean/SPARK-44861.
    
    Authored-by: jdesjean <jf.gauth...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../sql/connect/service/ExecuteEventsManager.scala |  37 ++++---
 .../service/ExecuteEventsManagerSuite.scala        | 114 ++++++++++++++-------
 .../ui/SparkConnectServerListenerSuite.scala       |   3 -
 .../connect/ui/SparkConnectServerPageSuite.scala   |   1 -
 4 files changed, 95 insertions(+), 60 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
index 5b9267a9679..23a67b7292b 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
@@ -119,19 +119,19 @@ case class ExecuteEventsManager(executeHolder: 
ExecuteHolder, clock: Clock) {
             s"${request.getPlan.getOpTypeCase} not supported.")
       }
 
-    listenerBus.post(
-      SparkListenerConnectOperationStarted(
-        jobTag,
-        operationId,
-        clock.getTimeMillis(),
-        sessionId,
-        request.getUserContext.getUserId,
-        request.getUserContext.getUserName,
-        Utils.redact(
-          sessionHolder.session.sessionState.conf.stringRedactionPattern,
-          ProtoUtils.abbreviate(plan, 
ExecuteEventsManager.MAX_STATEMENT_TEXT_SIZE).toString),
-        Some(request),
-        sparkSessionTags))
+    val event = SparkListenerConnectOperationStarted(
+      jobTag,
+      operationId,
+      clock.getTimeMillis(),
+      sessionId,
+      request.getUserContext.getUserId,
+      request.getUserContext.getUserName,
+      Utils.redact(
+        sessionHolder.session.sessionState.conf.stringRedactionPattern,
+        ProtoUtils.abbreviate(plan, 
ExecuteEventsManager.MAX_STATEMENT_TEXT_SIZE).toString),
+      sparkSessionTags)
+    event.planRequest = Some(request)
+    listenerBus.post(event)
   }
 
   /**
@@ -290,8 +290,6 @@ case class ExecuteEventsManager(executeHolder: 
ExecuteHolder, clock: Clock) {
  *   Opaque userName set in the Connect request.
  * @param statementText:
  *   The connect request plan converted to text.
- * @param planRequest:
- *   The Connect request. None if the operation is not of type @link 
proto.ExecutePlanRequest
  * @param sparkSessionTags:
  *   Extra tags set by the user (via SparkSession.addTag).
  * @param extraTags:
@@ -305,10 +303,15 @@ case class SparkListenerConnectOperationStarted(
     userId: String,
     userName: String,
     statementText: String,
-    planRequest: Option[proto.ExecutePlanRequest],
     sparkSessionTags: Set[String],
     extraTags: Map[String, String] = Map.empty)
-    extends SparkListenerEvent
+    extends SparkListenerEvent {
+
+  /**
+   * The Connect request. None if the operation is not of type @link 
proto.ExecutePlanRequest
+   */
+  @JsonIgnore var planRequest: Option[proto.ExecutePlanRequest] = None
+}
 
 /**
  * The event is sent after a Connect request has been analyzed (@link
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
index 7950f9c5474..12e67f2c59c 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.connect.planner.SparkConnectPlanTest
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
-import org.apache.spark.util.ManualClock
+import org.apache.spark.util.{JsonProtocol, ManualClock}
 
 class ExecuteEventsManagerSuite
     extends SparkFunSuite
@@ -55,19 +55,25 @@ class ExecuteEventsManagerSuite
   test("SPARK-43923: post started") {
     val events = setupEvents(ExecuteStatus.Pending)
     events.postStarted()
+    val expectedEvent = SparkListenerConnectOperationStarted(
+      events.executeHolder.jobTag,
+      DEFAULT_QUERY_ID,
+      DEFAULT_CLOCK.getTimeMillis(),
+      DEFAULT_SESSION_ID,
+      DEFAULT_USER_ID,
+      DEFAULT_USER_NAME,
+      DEFAULT_TEXT,
+      Set.empty,
+      Map.empty)
+    expectedEvent.planRequest = Some(events.executeHolder.request)
 
     
verify(events.executeHolder.sessionHolder.session.sparkContext.listenerBus, 
times(1))
-      .post(SparkListenerConnectOperationStarted(
-        events.executeHolder.jobTag,
-        DEFAULT_QUERY_ID,
-        DEFAULT_CLOCK.getTimeMillis(),
-        DEFAULT_SESSION_ID,
-        DEFAULT_USER_ID,
-        DEFAULT_USER_NAME,
-        DEFAULT_TEXT,
-        Some(events.executeHolder.request),
-        Set.empty,
-        Map.empty))
+      .post(expectedEvent)
+
+    assert(
+      JsonProtocol
+        .sparkEventFromJson(JsonProtocol.sparkEventToJsonString(expectedEvent))
+        .isInstanceOf[SparkListenerConnectOperationStarted])
   }
 
   test("SPARK-43923: post analyzed with plan") {
@@ -75,13 +81,18 @@ class ExecuteEventsManagerSuite
 
     val mockPlan = mock[LogicalPlan]
     events.postAnalyzed(Some(mockPlan))
-    val event = SparkListenerConnectOperationAnalyzed(
+    val expectedEvent = SparkListenerConnectOperationAnalyzed(
       events.executeHolder.jobTag,
       DEFAULT_QUERY_ID,
       DEFAULT_CLOCK.getTimeMillis())
-    event.analyzedPlan = Some(mockPlan)
+    expectedEvent.analyzedPlan = Some(mockPlan)
     
verify(events.executeHolder.sessionHolder.session.sparkContext.listenerBus, 
times(1))
-      .post(event)
+      .post(expectedEvent)
+
+    assert(
+      JsonProtocol
+        .sparkEventFromJson(JsonProtocol.sparkEventToJsonString(expectedEvent))
+        .isInstanceOf[SparkListenerConnectOperationAnalyzed])
   }
 
   test("SPARK-43923: post analyzed with empty plan") {
@@ -98,47 +109,67 @@ class ExecuteEventsManagerSuite
   test("SPARK-43923: post readyForExecution") {
     val events = setupEvents(ExecuteStatus.Analyzed)
     events.postReadyForExecution()
-    val event = SparkListenerConnectOperationReadyForExecution(
+    val expectedEvent = SparkListenerConnectOperationReadyForExecution(
       events.executeHolder.jobTag,
       DEFAULT_QUERY_ID,
       DEFAULT_CLOCK.getTimeMillis())
     
verify(events.executeHolder.sessionHolder.session.sparkContext.listenerBus, 
times(1))
-      .post(event)
+      .post(expectedEvent)
+
+    assert(
+      JsonProtocol
+        .sparkEventFromJson(JsonProtocol.sparkEventToJsonString(expectedEvent))
+        .isInstanceOf[SparkListenerConnectOperationReadyForExecution])
   }
 
   test("SPARK-43923: post canceled") {
     val events = setupEvents(ExecuteStatus.Started)
     events.postCanceled()
+    val expectedEvent = SparkListenerConnectOperationCanceled(
+      events.executeHolder.jobTag,
+      DEFAULT_QUERY_ID,
+      DEFAULT_CLOCK.getTimeMillis())
     
verify(events.executeHolder.sessionHolder.session.sparkContext.listenerBus, 
times(1))
-      .post(
-        SparkListenerConnectOperationCanceled(
-          events.executeHolder.jobTag,
-          DEFAULT_QUERY_ID,
-          DEFAULT_CLOCK.getTimeMillis()))
+      .post(expectedEvent)
+
+    assert(
+      JsonProtocol
+        .sparkEventFromJson(JsonProtocol.sparkEventToJsonString(expectedEvent))
+        .isInstanceOf[SparkListenerConnectOperationCanceled])
   }
 
   test("SPARK-43923: post failed") {
     val events = setupEvents(ExecuteStatus.Started)
     events.postFailed(DEFAULT_ERROR)
+    val expectedEvent = SparkListenerConnectOperationFailed(
+      events.executeHolder.jobTag,
+      DEFAULT_QUERY_ID,
+      DEFAULT_CLOCK.getTimeMillis(),
+      DEFAULT_ERROR,
+      Map.empty[String, String])
     
verify(events.executeHolder.sessionHolder.session.sparkContext.listenerBus, 
times(1))
-      .post(
-        SparkListenerConnectOperationFailed(
-          events.executeHolder.jobTag,
-          DEFAULT_QUERY_ID,
-          DEFAULT_CLOCK.getTimeMillis(),
-          DEFAULT_ERROR,
-          Map.empty[String, String]))
+      .post(expectedEvent)
+
+    assert(
+      JsonProtocol
+        .sparkEventFromJson(JsonProtocol.sparkEventToJsonString(expectedEvent))
+        .isInstanceOf[SparkListenerConnectOperationFailed])
   }
 
   test("SPARK-43923: post finished") {
     val events = setupEvents(ExecuteStatus.Started)
     events.postFinished()
+    val expectedEvent = SparkListenerConnectOperationFinished(
+      events.executeHolder.jobTag,
+      DEFAULT_QUERY_ID,
+      DEFAULT_CLOCK.getTimeMillis())
     
verify(events.executeHolder.sessionHolder.session.sparkContext.listenerBus, 
times(1))
-      .post(
-        SparkListenerConnectOperationFinished(
-          events.executeHolder.jobTag,
-          DEFAULT_QUERY_ID,
-          DEFAULT_CLOCK.getTimeMillis()))
+      .post(expectedEvent)
+
+    assert(
+      JsonProtocol
+        .sparkEventFromJson(JsonProtocol.sparkEventToJsonString(expectedEvent))
+        .isInstanceOf[SparkListenerConnectOperationFinished])
   }
 
   test("SPARK-44776: post finished with row number") {
@@ -156,12 +187,17 @@ class ExecuteEventsManagerSuite
   test("SPARK-43923: post closed") {
     val events = setupEvents(ExecuteStatus.Finished)
     events.postClosed()
+    val expectedEvent = SparkListenerConnectOperationClosed(
+      events.executeHolder.jobTag,
+      DEFAULT_QUERY_ID,
+      DEFAULT_CLOCK.getTimeMillis())
     
verify(events.executeHolder.sessionHolder.session.sparkContext.listenerBus, 
times(1))
-      .post(
-        SparkListenerConnectOperationClosed(
-          events.executeHolder.jobTag,
-          DEFAULT_QUERY_ID,
-          DEFAULT_CLOCK.getTimeMillis()))
+      .post(expectedEvent)
+
+    assert(
+      JsonProtocol
+        .sparkEventFromJson(JsonProtocol.sparkEventToJsonString(expectedEvent))
+        .isInstanceOf[SparkListenerConnectOperationClosed])
   }
 
   test("SPARK-43923: Closed wrong order throws exception") {
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala
index 7cdc0135201..3b75c37b2aa 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala
@@ -61,7 +61,6 @@ class SparkConnectServerListenerSuite
           "userId",
           "userName",
           "dummy query",
-          None,
           Set()))
       listener.onOtherEvent(
         SparkListenerConnectOperationAnalyzed(jobTag, "operationId", 
System.currentTimeMillis()))
@@ -153,7 +152,6 @@ class SparkConnectServerListenerSuite
         "userId",
         "userName",
         "dummy query",
-        None,
         Set()))
     listener.onOtherEvent(
       SparkListenerConnectOperationAnalyzed(jobTag, "operationId", 
System.currentTimeMillis()))
@@ -199,7 +197,6 @@ class SparkConnectServerListenerSuite
         "userId",
         "userName",
         "dummy query",
-        None,
         Set()))
     listener.onOtherEvent(
       SparkListenerConnectOperationAnalyzed(
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala
index 99d0a14f1e8..d352f4e2c32 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala
@@ -65,7 +65,6 @@ class SparkConnectServerPageSuite
         "userId",
         "userName",
         "dummy query",
-        None,
         Set()))
     listener.onOtherEvent(
       SparkListenerConnectOperationAnalyzed("jobTag", "dummy plan", 
System.currentTimeMillis()))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to