This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 555aece74d2 [SPARK-45093][CONNECT][PYTHON] Error reporting for 
addArtifacts query
555aece74d2 is described below

commit 555aece74d2a22d312e815ec07f5553800e14b9d
Author: Alice Sayutina <alice.sayut...@databricks.com>
AuthorDate: Fri Sep 22 12:31:23 2023 +0900

    [SPARK-45093][CONNECT][PYTHON] Error reporting for addArtifacts query
    
    ### What changes were proposed in this pull request?
    
    Add error logging into `addArtifact`  (see example in "How this is 
tested"). The logging code is moved into separate file to avoid circular 
dependency.
    
    ### Why are the changes needed?
    
    Currently, in case `addArtifact` is executed with the file which doesn't 
exist, the user gets cryptic error
    
    ```grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that 
terminated with:
            status = StatusCode.UNKNOWN
            details = "Exception iterating requests!"
            debug_error_string = "None"
    >
    ```
    
    Which is hard to debug without deep digging into the subject.
    
    This happens because addArtifact is implemented as client-side streaming 
and the actual error happens during grpc consuming iterator generating 
requests. Unfortunately grpc doesn't print any debug information for user to 
understand the problem.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Additional logging which is opt-in same way as before with 
`SPARK_CONNECT_LOG_LEVEL` environment variable.
    
    ### How was this patch tested?
    
    ```
    >>> s.addArtifact("XYZ", file=True)
    [New:] 2023-09-15 17:06:40,078 11789 ERROR _create_requests Failed to 
execute addArtifact: [Errno 2] No such file or directory: 
'/Users/alice.sayutina/apache_spark/python/XYZ'
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File 
"/Users/alice.sayutina/apache_spark/python/pyspark/sql/connect/session.py", 
line 743, in addArtifacts
        self._client.add_artifacts(*path, pyfile=pyfile, archive=archive, 
file=file)
    
    [....]
    
      File 
"/Users/alice.sayutina/oss-venv/lib/python3.11/site-packages/grpc/_channel.py", 
line 910, in _end_unary_response_blocking
        raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated 
with:
            status = StatusCode.UNKNOWN
            details = "Exception iterating requests!"
            debug_error_string = "None"
    >
    
    ```
    
    Closes #42949 from cdkrot/SPARK-45093.
    
    Lead-authored-by: Alice Sayutina <alice.sayut...@databricks.com>
    Co-authored-by: Alice Sayutina <cdkr...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/connect/client/__init__.py |  1 +
 python/pyspark/sql/connect/client/artifact.py | 16 +++++--
 python/pyspark/sql/connect/client/core.py     | 38 +----------------
 python/pyspark/sql/connect/client/logging.py  | 60 +++++++++++++++++++++++++++
 4 files changed, 74 insertions(+), 41 deletions(-)

diff --git a/python/pyspark/sql/connect/client/__init__.py 
b/python/pyspark/sql/connect/client/__init__.py
index 469d1c519a5..38523352e5b 100644
--- a/python/pyspark/sql/connect/client/__init__.py
+++ b/python/pyspark/sql/connect/client/__init__.py
@@ -20,3 +20,4 @@ from pyspark.sql.connect.utils import check_dependencies
 check_dependencies(__name__)
 
 from pyspark.sql.connect.client.core import *  # noqa: F401,F403
+from pyspark.sql.connect.client.logging import getLogLevel  # noqa: F401
diff --git a/python/pyspark/sql/connect/client/artifact.py 
b/python/pyspark/sql/connect/client/artifact.py
index c858768ccbf..fb31a57e0f6 100644
--- a/python/pyspark/sql/connect/client/artifact.py
+++ b/python/pyspark/sql/connect/client/artifact.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 from pyspark.sql.connect.utils import check_dependencies
+from pyspark.sql.connect.client.logging import logger
 
 check_dependencies(__name__)
 
@@ -243,11 +244,18 @@ class ArtifactManager:
         self, *path: str, pyfile: bool, archive: bool, file: bool
     ) -> Iterator[proto.AddArtifactsRequest]:
         """Separated for the testing purpose."""
-        return self._add_artifacts(
-            chain(
-                *(self._parse_artifacts(p, pyfile=pyfile, archive=archive, 
file=file) for p in path)
+        try:
+            yield from self._add_artifacts(
+                chain(
+                    *(
+                        self._parse_artifacts(p, pyfile=pyfile, 
archive=archive, file=file)
+                        for p in path
+                    )
+                )
             )
-        )
+        except Exception as e:
+            logger.error(f"Failed to submit addArtifacts request: {e}")
+            raise
 
     def _retrieve_responses(
         self, requests: Iterator[proto.AddArtifactsRequest]
diff --git a/python/pyspark/sql/connect/client/core.py 
b/python/pyspark/sql/connect/client/core.py
index ce6ea8ba3ee..34a867ce101 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -17,7 +17,6 @@
 __all__ = [
     "ChannelBuilder",
     "SparkConnectClient",
-    "getLogLevel",
 ]
 
 from pyspark.sql.connect.utils import check_dependencies
@@ -25,7 +24,6 @@ from pyspark.sql.connect.utils import check_dependencies
 check_dependencies(__name__)
 
 import threading
-import logging
 import os
 import platform
 import random
@@ -66,6 +64,7 @@ from google.rpc import error_details_pb2
 from pyspark.version import __version__
 from pyspark.resource.information import ResourceInformation
 from pyspark.sql.connect.client.artifact import ArtifactManager
+from pyspark.sql.connect.client.logging import logger
 from pyspark.sql.connect.client.reattach import (
     ExecutePlanResponseReattachableIterator,
     RetryException,
@@ -100,41 +99,6 @@ if TYPE_CHECKING:
     from pyspark.sql.connect._typing import DataTypeOrString
 
 
-def _configure_logging() -> logging.Logger:
-    """Configure logging for the Spark Connect clients."""
-    logger = logging.getLogger(__name__)
-    handler = logging.StreamHandler()
-    handler.setFormatter(
-        logging.Formatter(fmt="%(asctime)s %(process)d %(levelname)s 
%(funcName)s %(message)s")
-    )
-    logger.addHandler(handler)
-
-    # Check the environment variables for log levels:
-    if "SPARK_CONNECT_LOG_LEVEL" in os.environ:
-        logger.setLevel(os.environ["SPARK_CONNECT_LOG_LEVEL"].upper())
-    else:
-        logger.disabled = True
-    return logger
-
-
-# Instantiate the logger based on the environment configuration.
-logger = _configure_logging()
-
-
-def getLogLevel() -> Optional[int]:
-    """
-    This returns this log level as integer, or none (if no logging is enabled).
-
-    Spark Connect logging can be configured with environment variable 
'SPARK_CONNECT_LOG_LEVEL'
-
-    .. versionadded:: 3.5.0
-    """
-
-    if not logger.disabled:
-        return logger.level
-    return None
-
-
 class ChannelBuilder:
     """
     This is a helper class that is used to create a GRPC channel based on the 
given
diff --git a/python/pyspark/sql/connect/client/logging.py 
b/python/pyspark/sql/connect/client/logging.py
new file mode 100644
index 00000000000..7fdcfaca4cf
--- /dev/null
+++ b/python/pyspark/sql/connect/client/logging.py
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import logging
+import os
+from typing import Optional
+
+__all__ = [
+    "getLogLevel",
+]
+
+
+def _configure_logging() -> logging.Logger:
+    """Configure logging for the Spark Connect clients."""
+    logger = logging.getLogger(__name__)
+    handler = logging.StreamHandler()
+    handler.setFormatter(
+        logging.Formatter(fmt="%(asctime)s %(process)d %(levelname)s 
%(funcName)s %(message)s")
+    )
+    logger.addHandler(handler)
+
+    # Check the environment variables for log levels:
+    if "SPARK_CONNECT_LOG_LEVEL" in os.environ:
+        logger.setLevel(os.environ["SPARK_CONNECT_LOG_LEVEL"].upper())
+    else:
+        logger.disabled = True
+    return logger
+
+
+# Instantiate the logger based on the environment configuration.
+logger = _configure_logging()
+
+
+def getLogLevel() -> Optional[int]:
+    """
+    This returns this log level as integer, or none (if no logging is enabled).
+
+    Spark Connect logging can be configured with environment variable 
'SPARK_CONNECT_LOG_LEVEL'
+
+    .. versionadded:: 3.5.0
+    """
+
+    if not logger.disabled:
+        return logger.level
+    return None


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to