grundprinzip commented on code in PR #39695: URL: https://github.com/apache/spark/pull/39695#discussion_r1083641323
########## python/pyspark/sql/connect/client.py: ########## @@ -567,54 +602,48 @@ def _execute_and_fetch( logger.info("ExecuteAndFetch") m: Optional[pb2.ExecutePlanResponse.Metrics] = None - batches: List[pa.RecordBatch] = [] try: - for b in self._stub.ExecutePlan(req, metadata=self._builder.metadata()): - if b.client_id != self._session_id: - raise SparkConnectException( - "Received incorrect session identifier for request." - ) - if b.metrics is not None: - logger.debug("Received metric batch.") - m = b.metrics - if b.HasField("arrow_batch"): - logger.debug( - f"Received arrow batch rows={b.arrow_batch.row_count} " - f"size={len(b.arrow_batch.data)}" - ) - - with pa.ipc.open_stream(b.arrow_batch.data) as reader: - for batch in reader: - assert isinstance(batch, pa.RecordBatch) - batches.append(batch) + for attempt in Retrying(SparkConnectClient.retry_exception, **self._retry_policy): + with attempt: + for b in self._stub.ExecutePlan(req, metadata=self._builder.metadata()): + if b.client_id != self._session_id: + raise SparkConnectException( + "Received incorrect session identifier for request." + ) + if b.metrics is not None: + logger.debug("Received metric batch.") + m = b.metrics + if b.HasField("arrow_batch"): + logger.debug( + f"Received arrow batch rows={b.arrow_batch.row_count} " + f"size={len(b.arrow_batch.data)}" + ) + + with pa.ipc.open_stream(b.arrow_batch.data) as reader: + for batch in reader: + assert isinstance(batch, pa.RecordBatch) + batches.append(batch) except grpc.RpcError as rpc_error: self._handle_error(rpc_error) - assert len(batches) > 0 - table = pa.Table.from_batches(batches=batches) - metrics: List[PlanMetrics] = self._build_metrics(m) if m is not None else [] - return table, metrics def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn: """ Error handling helper for dealing with GRPC Errors. On the server side, certain exceptions are enriched with additional RPC Status information. These are unpacked in this function and put into the exception. - To avoid overloading the user with GRPC errors, this message explicitly swallows the error context from the call. This GRPC Error is logged however, and can be enabled. - Parameters ---------- rpc_error : grpc.RpcError RPC Error containing the details of the exception. - Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org