This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 555aece74d2 [SPARK-45093][CONNECT][PYTHON] Error reporting for addArtifacts query 555aece74d2 is described below commit 555aece74d2a22d312e815ec07f5553800e14b9d Author: Alice Sayutina <alice.sayut...@databricks.com> AuthorDate: Fri Sep 22 12:31:23 2023 +0900 [SPARK-45093][CONNECT][PYTHON] Error reporting for addArtifacts query ### What changes were proposed in this pull request? Add error logging into `addArtifact` (see example in "How this is tested"). The logging code is moved into separate file to avoid circular dependency. ### Why are the changes needed? Currently, in case `addArtifact` is executed with the file which doesn't exist, the user gets cryptic error ```grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNKNOWN details = "Exception iterating requests!" debug_error_string = "None" > ``` Which is hard to debug without deep digging into the subject. This happens because addArtifact is implemented as client-side streaming and the actual error happens during grpc consuming iterator generating requests. Unfortunately grpc doesn't print any debug information for user to understand the problem. ### Does this PR introduce _any_ user-facing change? Additional logging which is opt-in same way as before with `SPARK_CONNECT_LOG_LEVEL` environment variable. ### How was this patch tested? ``` >>> s.addArtifact("XYZ", file=True) [New:] 2023-09-15 17:06:40,078 11789 ERROR _create_requests Failed to execute addArtifact: [Errno 2] No such file or directory: '/Users/alice.sayutina/apache_spark/python/XYZ' Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/Users/alice.sayutina/apache_spark/python/pyspark/sql/connect/session.py", line 743, in addArtifacts self._client.add_artifacts(*path, pyfile=pyfile, archive=archive, file=file) [....] File "/Users/alice.sayutina/oss-venv/lib/python3.11/site-packages/grpc/_channel.py", line 910, in _end_unary_response_blocking raise _InactiveRpcError(state) # pytype: disable=not-instantiable ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNKNOWN details = "Exception iterating requests!" debug_error_string = "None" > ``` Closes #42949 from cdkrot/SPARK-45093. Lead-authored-by: Alice Sayutina <alice.sayut...@databricks.com> Co-authored-by: Alice Sayutina <cdkr...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/connect/client/__init__.py | 1 + python/pyspark/sql/connect/client/artifact.py | 16 +++++-- python/pyspark/sql/connect/client/core.py | 38 +---------------- python/pyspark/sql/connect/client/logging.py | 60 +++++++++++++++++++++++++++ 4 files changed, 74 insertions(+), 41 deletions(-) diff --git a/python/pyspark/sql/connect/client/__init__.py b/python/pyspark/sql/connect/client/__init__.py index 469d1c519a5..38523352e5b 100644 --- a/python/pyspark/sql/connect/client/__init__.py +++ b/python/pyspark/sql/connect/client/__init__.py @@ -20,3 +20,4 @@ from pyspark.sql.connect.utils import check_dependencies check_dependencies(__name__) from pyspark.sql.connect.client.core import * # noqa: F401,F403 +from pyspark.sql.connect.client.logging import getLogLevel # noqa: F401 diff --git a/python/pyspark/sql/connect/client/artifact.py b/python/pyspark/sql/connect/client/artifact.py index c858768ccbf..fb31a57e0f6 100644 --- a/python/pyspark/sql/connect/client/artifact.py +++ b/python/pyspark/sql/connect/client/artifact.py @@ -15,6 +15,7 @@ # limitations under the License. # from pyspark.sql.connect.utils import check_dependencies +from pyspark.sql.connect.client.logging import logger check_dependencies(__name__) @@ -243,11 +244,18 @@ class ArtifactManager: self, *path: str, pyfile: bool, archive: bool, file: bool ) -> Iterator[proto.AddArtifactsRequest]: """Separated for the testing purpose.""" - return self._add_artifacts( - chain( - *(self._parse_artifacts(p, pyfile=pyfile, archive=archive, file=file) for p in path) + try: + yield from self._add_artifacts( + chain( + *( + self._parse_artifacts(p, pyfile=pyfile, archive=archive, file=file) + for p in path + ) + ) ) - ) + except Exception as e: + logger.error(f"Failed to submit addArtifacts request: {e}") + raise def _retrieve_responses( self, requests: Iterator[proto.AddArtifactsRequest] diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index ce6ea8ba3ee..34a867ce101 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -17,7 +17,6 @@ __all__ = [ "ChannelBuilder", "SparkConnectClient", - "getLogLevel", ] from pyspark.sql.connect.utils import check_dependencies @@ -25,7 +24,6 @@ from pyspark.sql.connect.utils import check_dependencies check_dependencies(__name__) import threading -import logging import os import platform import random @@ -66,6 +64,7 @@ from google.rpc import error_details_pb2 from pyspark.version import __version__ from pyspark.resource.information import ResourceInformation from pyspark.sql.connect.client.artifact import ArtifactManager +from pyspark.sql.connect.client.logging import logger from pyspark.sql.connect.client.reattach import ( ExecutePlanResponseReattachableIterator, RetryException, @@ -100,41 +99,6 @@ if TYPE_CHECKING: from pyspark.sql.connect._typing import DataTypeOrString -def _configure_logging() -> logging.Logger: - """Configure logging for the Spark Connect clients.""" - logger = logging.getLogger(__name__) - handler = logging.StreamHandler() - handler.setFormatter( - logging.Formatter(fmt="%(asctime)s %(process)d %(levelname)s %(funcName)s %(message)s") - ) - logger.addHandler(handler) - - # Check the environment variables for log levels: - if "SPARK_CONNECT_LOG_LEVEL" in os.environ: - logger.setLevel(os.environ["SPARK_CONNECT_LOG_LEVEL"].upper()) - else: - logger.disabled = True - return logger - - -# Instantiate the logger based on the environment configuration. -logger = _configure_logging() - - -def getLogLevel() -> Optional[int]: - """ - This returns this log level as integer, or none (if no logging is enabled). - - Spark Connect logging can be configured with environment variable 'SPARK_CONNECT_LOG_LEVEL' - - .. versionadded:: 3.5.0 - """ - - if not logger.disabled: - return logger.level - return None - - class ChannelBuilder: """ This is a helper class that is used to create a GRPC channel based on the given diff --git a/python/pyspark/sql/connect/client/logging.py b/python/pyspark/sql/connect/client/logging.py new file mode 100644 index 00000000000..7fdcfaca4cf --- /dev/null +++ b/python/pyspark/sql/connect/client/logging.py @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +import logging +import os +from typing import Optional + +__all__ = [ + "getLogLevel", +] + + +def _configure_logging() -> logging.Logger: + """Configure logging for the Spark Connect clients.""" + logger = logging.getLogger(__name__) + handler = logging.StreamHandler() + handler.setFormatter( + logging.Formatter(fmt="%(asctime)s %(process)d %(levelname)s %(funcName)s %(message)s") + ) + logger.addHandler(handler) + + # Check the environment variables for log levels: + if "SPARK_CONNECT_LOG_LEVEL" in os.environ: + logger.setLevel(os.environ["SPARK_CONNECT_LOG_LEVEL"].upper()) + else: + logger.disabled = True + return logger + + +# Instantiate the logger based on the environment configuration. +logger = _configure_logging() + + +def getLogLevel() -> Optional[int]: + """ + This returns this log level as integer, or none (if no logging is enabled). + + Spark Connect logging can be configured with environment variable 'SPARK_CONNECT_LOG_LEVEL' + + .. versionadded:: 3.5.0 + """ + + if not logger.disabled: + return logger.level + return None --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org