WweiL commented on code in PR #42986:
URL: https://github.com/apache/spark/pull/42986#discussion_r1329441472


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala:
##########
@@ -76,16 +78,21 @@ class PythonStreamingQueryListener(listener: 
SimplePythonFunction, sessionHolder
   }
 
   private def handlePythonWorkerError(functionName: String): Unit = {
-    dataIn.readInt() match {
-      case ret if ret == 0 =>
-        logInfo(s"Streaming query listener function $functionName completed 
(ret: $ret)")
-      case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
-        val exLength = dataIn.readInt()
-        val obj = new Array[Byte](exLength)
-        dataIn.readFully(obj)
-        val msg = new String(obj, StandardCharsets.UTF_8)
-        throw new IllegalStateException(s"Found error inside Streaming query 
listener Python " +
-          s"process for function $functionName: $msg")
+    try {
+      dataIn.readInt() match {
+        case ret if ret == 0 =>
+          logInfo(s"Streaming query listener function $functionName completed 
(ret: $ret)")
+        case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+          val exLength = dataIn.readInt()
+          val obj = new Array[Byte](exLength)
+          dataIn.readFully(obj)
+          val msg = new String(obj, StandardCharsets.UTF_8)
+          throw new PythonException(s"Found error inside Streaming query 
listener Python " +
+            s"process for function $functionName: $msg", null)
+      }
+    } catch {
+      case eof: EOFException =>

Review Comment:
   @HyukjinKwon @taku-k Guys, do you know if the EOFException would happen 
frequently in existing pyspark tests? I was wondering if this is possible:
   Say in common PythonRunner, EOF occurs, then the task would likely be 
failing but spark nicely handles the retry.
   
   But here if the socket has some error, we don't have any retry logic, and 
the stream query would just fail with our issue.
   
   Given that streamingPythonRunners are long-running processes, I was 
wondering if you also think having a retry-logic would be beneficial as a 
followup.
   
   cc @rangadi 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to