This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 675f5f0b599b [SPARK-47818][CONNECT][FOLLOW-UP] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests 675f5f0b599b is described below commit 675f5f0b599ba650abed879ead677dd6cba818a2 Author: Xi Lyu <xi....@databricks.com> AuthorDate: Fri Apr 26 13:15:21 2024 -0400 [SPARK-47818][CONNECT][FOLLOW-UP] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests ### What changes were proposed in this pull request? In [the previous PR](https://github.com/apache/spark/pull/46012), we cache plans of AnalyzePlan requests. We're also enabling it for ExecutePlan in this PR. ### Why are the changes needed? Some operations like spark.sql() issue ExecutePlan requests. By caching them, we can also improve performance if subsequent plans to be analyzed include the plan returned by ExecutePlan as a subtree. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46098 from xi-db/SPARK-47818-plan-cache-followup. Authored-by: Xi Lyu <xi....@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../sql/connect/execution/SparkConnectPlanExecution.scala | 2 +- .../sql/tests/connect/test_parity_memory_profiler.py | 15 +++++++++++++++ .../pyspark/sql/tests/connect/test_parity_udf_profiler.py | 15 +++++++++++++++ 3 files changed, 31 insertions(+), 1 deletion(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala index 32cdae7bae56..4f2b8c945127 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala @@ -71,7 +71,7 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder) val dataframe = Dataset.ofRows( sessionHolder.session, - planner.transformRelation(request.getPlan.getRoot), + planner.transformRelation(request.getPlan.getRoot, cachePlan = true), tracker, shuffleCleanupMode) responseObserver.onNext(createSchemaResponse(request.getSessionId, dataframe.schema)) diff --git a/python/pyspark/sql/tests/connect/test_parity_memory_profiler.py b/python/pyspark/sql/tests/connect/test_parity_memory_profiler.py index 513e49a144e5..c6ef9810c684 100644 --- a/python/pyspark/sql/tests/connect/test_parity_memory_profiler.py +++ b/python/pyspark/sql/tests/connect/test_parity_memory_profiler.py @@ -27,6 +27,20 @@ class MemoryProfilerParityTests(MemoryProfiler2TestsMixin, ReusedConnectTestCase super().setUp() self.spark._profiler_collector._value = None + +class MemoryProfilerWithoutPlanCacheParityTests(MemoryProfilerParityTests): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.spark.conf.set("spark.connect.session.planCache.enabled", False) + + @classmethod + def tearDownClass(cls): + try: + cls.spark.conf.unset("spark.connect.session.planCache.enabled") + finally: + super().tearDownClass() + def test_memory_profiler_udf_multiple_actions(self): def action(df): df.collect() @@ -35,6 +49,7 @@ class MemoryProfilerParityTests(MemoryProfiler2TestsMixin, ReusedConnectTestCase with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): _do_computation(self.spark, action=action) + # Without the plan cache, UDF ID will be different for each action self.assertEqual(6, len(self.profile_results), str(list(self.profile_results))) for id in self.profile_results: diff --git a/python/pyspark/sql/tests/connect/test_parity_udf_profiler.py b/python/pyspark/sql/tests/connect/test_parity_udf_profiler.py index dfa56ff0bb88..a1789a50896d 100644 --- a/python/pyspark/sql/tests/connect/test_parity_udf_profiler.py +++ b/python/pyspark/sql/tests/connect/test_parity_udf_profiler.py @@ -27,6 +27,20 @@ class UDFProfilerParityTests(UDFProfiler2TestsMixin, ReusedConnectTestCase): super().setUp() self.spark._profiler_collector._value = None + +class UDFProfilerWithoutPlanCacheParityTests(UDFProfilerParityTests): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.spark.conf.set("spark.connect.session.planCache.enabled", False) + + @classmethod + def tearDownClass(cls): + try: + cls.spark.conf.unset("spark.connect.session.planCache.enabled") + finally: + super().tearDownClass() + def test_perf_profiler_udf_multiple_actions(self): def action(df): df.collect() @@ -35,6 +49,7 @@ class UDFProfilerParityTests(UDFProfiler2TestsMixin, ReusedConnectTestCase): with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): _do_computation(self.spark, action=action) + # Without the plan cache, UDF ID will be different for each action self.assertEqual(6, len(self.profile_results), str(list(self.profile_results))) for id in self.profile_results: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org