grundprinzip commented on code in PR #39695:
URL: https://github.com/apache/spark/pull/39695#discussion_r1083641393


##########
python/pyspark/sql/connect/client.py:
##########
@@ -567,54 +602,48 @@ def _execute_and_fetch(
         logger.info("ExecuteAndFetch")
 
         m: Optional[pb2.ExecutePlanResponse.Metrics] = None
-
         batches: List[pa.RecordBatch] = []
 
         try:
-            for b in self._stub.ExecutePlan(req, 
metadata=self._builder.metadata()):
-                if b.client_id != self._session_id:
-                    raise SparkConnectException(
-                        "Received incorrect session identifier for request."
-                    )
-                if b.metrics is not None:
-                    logger.debug("Received metric batch.")
-                    m = b.metrics
-                if b.HasField("arrow_batch"):
-                    logger.debug(
-                        f"Received arrow batch rows={b.arrow_batch.row_count} "
-                        f"size={len(b.arrow_batch.data)}"
-                    )
-
-                    with pa.ipc.open_stream(b.arrow_batch.data) as reader:
-                        for batch in reader:
-                            assert isinstance(batch, pa.RecordBatch)
-                            batches.append(batch)
+            for attempt in Retrying(SparkConnectClient.retry_exception, 
**self._retry_policy):
+                with attempt:
+                    for b in self._stub.ExecutePlan(req, 
metadata=self._builder.metadata()):
+                        if b.client_id != self._session_id:
+                            raise SparkConnectException(
+                                "Received incorrect session identifier for 
request."
+                            )
+                        if b.metrics is not None:
+                            logger.debug("Received metric batch.")
+                            m = b.metrics
+                        if b.HasField("arrow_batch"):
+                            logger.debug(
+                                f"Received arrow batch 
rows={b.arrow_batch.row_count} "
+                                f"size={len(b.arrow_batch.data)}"
+                            )
+
+                            with pa.ipc.open_stream(b.arrow_batch.data) as 
reader:
+                                for batch in reader:
+                                    assert isinstance(batch, pa.RecordBatch)
+                                    batches.append(batch)
         except grpc.RpcError as rpc_error:
             self._handle_error(rpc_error)
-
         assert len(batches) > 0
-
         table = pa.Table.from_batches(batches=batches)
-
         metrics: List[PlanMetrics] = self._build_metrics(m) if m is not None 
else []
-
         return table, metrics
 
     def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn:
         """
         Error handling helper for dealing with GRPC Errors. On the server 
side, certain
         exceptions are enriched with additional RPC Status information. These 
are
         unpacked in this function and put into the exception.
-
         To avoid overloading the user with GRPC errors, this message explicitly
         swallows the error context from the call. This GRPC Error is logged 
however,
         and can be enabled.
-

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to