This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 675f5f0b599b [SPARK-47818][CONNECT][FOLLOW-UP] Introduce plan cache in 
SparkConnectPlanner to improve performance of Analyze requests
675f5f0b599b is described below

commit 675f5f0b599ba650abed879ead677dd6cba818a2
Author: Xi Lyu <xi....@databricks.com>
AuthorDate: Fri Apr 26 13:15:21 2024 -0400

    [SPARK-47818][CONNECT][FOLLOW-UP] Introduce plan cache in 
SparkConnectPlanner to improve performance of Analyze requests
    
    ### What changes were proposed in this pull request?
    
    In [the previous PR](https://github.com/apache/spark/pull/46012), we cache 
plans of AnalyzePlan requests. We're also enabling it for ExecutePlan in this 
PR.
    
    ### Why are the changes needed?
    
    Some operations like spark.sql() issue ExecutePlan requests. By caching 
them, we can also improve performance if subsequent plans to be analyzed 
include the plan returned by ExecutePlan as a subtree.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #46098 from xi-db/SPARK-47818-plan-cache-followup.
    
    Authored-by: Xi Lyu <xi....@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../sql/connect/execution/SparkConnectPlanExecution.scala |  2 +-
 .../sql/tests/connect/test_parity_memory_profiler.py      | 15 +++++++++++++++
 .../pyspark/sql/tests/connect/test_parity_udf_profiler.py | 15 +++++++++++++++
 3 files changed, 31 insertions(+), 1 deletion(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
index 32cdae7bae56..4f2b8c945127 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
@@ -71,7 +71,7 @@ private[execution] class 
SparkConnectPlanExecution(executeHolder: ExecuteHolder)
     val dataframe =
       Dataset.ofRows(
         sessionHolder.session,
-        planner.transformRelation(request.getPlan.getRoot),
+        planner.transformRelation(request.getPlan.getRoot, cachePlan = true),
         tracker,
         shuffleCleanupMode)
     responseObserver.onNext(createSchemaResponse(request.getSessionId, 
dataframe.schema))
diff --git a/python/pyspark/sql/tests/connect/test_parity_memory_profiler.py 
b/python/pyspark/sql/tests/connect/test_parity_memory_profiler.py
index 513e49a144e5..c6ef9810c684 100644
--- a/python/pyspark/sql/tests/connect/test_parity_memory_profiler.py
+++ b/python/pyspark/sql/tests/connect/test_parity_memory_profiler.py
@@ -27,6 +27,20 @@ class MemoryProfilerParityTests(MemoryProfiler2TestsMixin, 
ReusedConnectTestCase
         super().setUp()
         self.spark._profiler_collector._value = None
 
+
+class MemoryProfilerWithoutPlanCacheParityTests(MemoryProfilerParityTests):
+    @classmethod
+    def setUpClass(cls):
+        super().setUpClass()
+        cls.spark.conf.set("spark.connect.session.planCache.enabled", False)
+
+    @classmethod
+    def tearDownClass(cls):
+        try:
+            cls.spark.conf.unset("spark.connect.session.planCache.enabled")
+        finally:
+            super().tearDownClass()
+
     def test_memory_profiler_udf_multiple_actions(self):
         def action(df):
             df.collect()
@@ -35,6 +49,7 @@ class MemoryProfilerParityTests(MemoryProfiler2TestsMixin, 
ReusedConnectTestCase
         with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
             _do_computation(self.spark, action=action)
 
+        # Without the plan cache, UDF ID will be different for each action
         self.assertEqual(6, len(self.profile_results), 
str(list(self.profile_results)))
 
         for id in self.profile_results:
diff --git a/python/pyspark/sql/tests/connect/test_parity_udf_profiler.py 
b/python/pyspark/sql/tests/connect/test_parity_udf_profiler.py
index dfa56ff0bb88..a1789a50896d 100644
--- a/python/pyspark/sql/tests/connect/test_parity_udf_profiler.py
+++ b/python/pyspark/sql/tests/connect/test_parity_udf_profiler.py
@@ -27,6 +27,20 @@ class UDFProfilerParityTests(UDFProfiler2TestsMixin, 
ReusedConnectTestCase):
         super().setUp()
         self.spark._profiler_collector._value = None
 
+
+class UDFProfilerWithoutPlanCacheParityTests(UDFProfilerParityTests):
+    @classmethod
+    def setUpClass(cls):
+        super().setUpClass()
+        cls.spark.conf.set("spark.connect.session.planCache.enabled", False)
+
+    @classmethod
+    def tearDownClass(cls):
+        try:
+            cls.spark.conf.unset("spark.connect.session.planCache.enabled")
+        finally:
+            super().tearDownClass()
+
     def test_perf_profiler_udf_multiple_actions(self):
         def action(df):
             df.collect()
@@ -35,6 +49,7 @@ class UDFProfilerParityTests(UDFProfiler2TestsMixin, 
ReusedConnectTestCase):
         with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
             _do_computation(self.spark, action=action)
 
+        # Without the plan cache, UDF ID will be different for each action
         self.assertEqual(6, len(self.profile_results), 
str(list(self.profile_results)))
 
         for id in self.profile_results:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to