Hyukjin Kwon created SPARK-54478:
------------------------------------

             Summary: Reeanble 
pyspark.sql.connect.streaming.readwriter.DataStreamWriter.foreachBatch
                 Key: SPARK-54478
                 URL: https://issues.apache.org/jira/browse/SPARK-54478
             Project: Spark
          Issue Type: Sub-task
          Components: Tests
    Affects Versions: 4.0.1, 4.1.0, 4.2.0
            Reporter: Hyukjin Kwon


{code}
pyspark.sql.connect.streaming.readwriter.DataStreamWriter.foreachBatch
Failed example:
    q = df.writeStream.foreachBatch(func).start()
Exception raised:
    Traceback (most recent call last):
      File "/opt/hostedtoolcache/Python/3.11.14/x64/lib/python3.11/doctest.py", 
line 1355, in __run
        exec(compile(example.source, filename, "single",
      File "<doctest 
pyspark.sql.connect.streaming.readwriter.DataStreamWriter.foreachBatch[4]>", 
line 1, in <module>
        q = df.writeStream.foreachBatch(func).start()
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/streaming/readwriter.py",
 line 656, in start
        return self._start_internal(
               ^^^^^^^^^^^^^^^^^^^^^
      File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/streaming/readwriter.py",
 line 625, in _start_internal
        (_, properties, _) = self._session.client.execute_command(cmd)
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1148, in execute_command
        data, _, metrics, observed_metrics, properties = 
self._execute_and_fetch(
                                                         
^^^^^^^^^^^^^^^^^^^^^^^^
      File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1560, in _execute_and_fetch
        for response in self._execute_and_fetch_as_iterator(
      File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1537, in _execute_and_fetch_as_iterator
        self._handle_error(error)
      File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1811, in _handle_error
        self._handle_rpc_error(error)
      File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1882, in _handle_rpc_error
        raise convert_exception(
    pyspark.errors.exceptions.connect.SparkException: Python worker failed to 
connect back.
    JVM stacktrace:
    org.apache.spark.SparkException
        at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:281)
        at 
org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:79)
        at 
org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:154)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3497)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2844)
        at 
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:95)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:396)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:396)
        at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
        at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
        at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:395)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:334)
    Caused by: java.net.SocketTimeoutException: Timed out while waiting for the 
Python worker to connect back
        at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:263)
        at 
org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:79)
        at 
org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:154)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3497)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2844)
        at 
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:95)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:396)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:396)
        at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
        at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
        at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:395)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:334)
**********************************************************************
File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/streaming/readwriter.py",
 line 626, in 
pyspark.sql.connect.streaming.readwriter.DataStreamWriter.foreachBatch
Failed example:
    q.stop()
Exception raised:
    Traceback (most recent call last):
      File "/opt/hostedtoolcache/Python/3.11.14/x64/lib/python3.11/doctest.py", 
line 1355, in __run
        exec(compile(example.source, filename, "single",
      File "<doctest 
pyspark.sql.connect.streaming.readwriter.DataStreamWriter.foreachBatch[6]>", 
line 1, in <module>
        q.stop()
        ^
    NameError: name 'q' is not defined
**********************************************************************
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to