This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new c284faa [SPARK-38556][PYTHON] Disable Pandas usage logging for method calls inside @contextmanager functions c284faa is described below commit c284faad2d7d3b813c1c94c612b814c129b6dad3 Author: Yihong He <yihong...@databricks.com> AuthorDate: Thu Mar 17 10:03:42 2022 +0900 [SPARK-38556][PYTHON] Disable Pandas usage logging for method calls inside @contextmanager functions ### What changes were proposed in this pull request? Wrap AbstractContextManager returned by contexmanager decorator function in function calls. The comment in the code change explain why it uses a wrapper class instead of wrapping functions of AbstractContextManager directly. ### Why are the changes needed? Currently, method calls inside contextmanager functions are treated as external for **with** statements. For example, the below code records config.set_option calls inside ps.option_context(...) ```python with ps.option_context("compute.ops_on_diff_frames", True): pass ``` We should disable usage logging for calls inside contextmanager functions to improve accuracy of the usage data ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Existing tests - Manual test by running `./bin/pyspark` and verified the output: ``` >>> sc.setLogLevel("info") >>> import pyspark.pandas as ps 22/03/15 17:10:50 INFO Log4jUsageLogger: pandasOnSparkImported=1.0, tags=List(), blob= >>> with ps.option_context("compute.ops_on_diff_frames", True): ... pass ... 22/03/15 17:11:17 INFO Log4jUsageLogger: pandasOnSparkFunctionCalled=1.0, tags=List(pandasOnSparkFunction=option_context(*args: Any) -> Iterator[NoneType], className=config, status=success), blob={"duration": 0.1615259999994123} 22/03/15 17:11:18 INFO Log4jUsageLogger: initialConfigLogging=1.0, tags=List(sparkApplicationId=local-1647360645198, sparkExecutionId=null, sparkJobGroupId=null), blob={"spark.sql.warehouse.dir":"file:/Users/yihong.he/spark/spark-warehouse","spark.executor.extraJavaOptions":"-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL [...] 22/03/15 17:11:19 INFO Log4jUsageLogger: pandasOnSparkFunctionCalled=1.0, tags=List(pandasOnSparkFunction=option_context.__enter__(), className=config, status=success), blob={"duration": 1594.1569399999978} 22/03/15 17:11:19 INFO Log4jUsageLogger: pandasOnSparkFunctionCalled=1.0, tags=List(pandasOnSparkFunction=option_context.__exit__(type, value, traceback), className=config, status=success), blob={"duration": 12.610170000002086} ``` Closes #35861 from heyihong/SPARK-38556. Authored-by: Yihong He <yihong...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 7d1ff01299c88a1aadfac032ea0b3ef87f4ae50d) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/instrumentation_utils.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/python/pyspark/instrumentation_utils.py b/python/pyspark/instrumentation_utils.py index 908f5cb..b9aacf6 100644 --- a/python/pyspark/instrumentation_utils.py +++ b/python/pyspark/instrumentation_utils.py @@ -21,6 +21,7 @@ import inspect import threading import importlib import time +from contextlib import AbstractContextManager from types import ModuleType from typing import Tuple, Union, List, Callable, Any, Type @@ -30,6 +31,24 @@ __all__: List[str] = [] _local = threading.local() +class _WrappedAbstractContextManager(AbstractContextManager): + def __init__( + self, acm: AbstractContextManager, class_name: str, function_name: str, logger: Any + ): + self._enter_func = _wrap_function( + class_name, "{}.__enter__".format(function_name), acm.__enter__, logger + ) + self._exit_func = _wrap_function( + class_name, "{}.__exit__".format(function_name), acm.__exit__, logger + ) + + def __enter__(self): # type: ignore[no-untyped-def] + return self._enter_func() + + def __exit__(self, exc_type, exc_val, exc_tb): # type: ignore[no-untyped-def] + return self._exit_func(exc_type, exc_val, exc_tb) + + def _wrap_function(class_name: str, function_name: str, func: Callable, logger: Any) -> Callable: signature = inspect.signature(func) @@ -44,6 +63,17 @@ def _wrap_function(class_name: str, function_name: str, func: Callable, logger: start = time.perf_counter() try: res = func(*args, **kwargs) + if isinstance(res, AbstractContextManager): + # Wrap AbstractContextManager's subclasses returned by @contextmanager decorator + # function so that wrapped function calls inside __enter__ and __exit__ + # are not recorded by usage logger. + # + # The reason to add a wrapped class after function calls instead of + # wrapping __enter__ and __exit__ methods of _GeneratorContextManager class is + # because usage logging should be disabled for functions with @contextmanager + # decorator in PySpark only. + res = _WrappedAbstractContextManager(res, class_name, function_name, logger) + logger.log_success( class_name, function_name, time.perf_counter() - start, signature ) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org