WweiL commented on code in PR #46929:
URL: https://github.com/apache/spark/pull/46929#discussion_r1700575228


##########
connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala:
##########
@@ -238,17 +238,45 @@ private[connect] class ExecuteThreadRunner(executeHolder: 
ExecuteHolder) extends
         }
         completed = true // no longer interruptible
 
-        if (executeHolder.reattachable) {
-          // Reattachable execution sends a ResultComplete at the end of the 
stream
-          // to signal that there isn't more coming.
-          executeHolder.responseObserver.onNextComplete(createResultComplete())
-        } else {
-          executeHolder.responseObserver.onCompleted()
+        // If the request returns a long running iterator (e.g. 
StreamingQueryListener needs
+        // a long-running iterator to continuously stream back events),
+        // we delegate the sending of the final ResultComplete to the handler 
itself.
+        if (!createsLongRunningIterator(executeHolder.request)) {
+          if (executeHolder.reattachable) {
+            // Reattachable execution sends a ResultComplete at the end of the 
stream
+            // to signal that there isn't more coming.
+            
executeHolder.responseObserver.onNextComplete(createResultComplete())
+          } else {
+            executeHolder.responseObserver.onCompleted()
+          }
         }
       }
     }
   }
 
+  /**
+   * Perform a check to see if the request creates a long running iterator. 
Currently, only the
+   * ADD_LISTENER_BUS_LISTENER command creates a long running iterator. This 
is used to
+   * continuously stream back events to the client side StreamingQueryListener.
+   * @param request The request to check
+   * @return True if the iterator is long running
+   */
+  private def createsLongRunningIterator(request: proto.ExecutePlanRequest): 
Boolean = {

Review Comment:
   I changed it to `shouldDelegateCompleteResponse`. This is because a 
streaming query also starts a new background thread (to run the query), but we 
don't need to delegate the sending of ResultComplete to it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to