This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f87fe2f513d [SPARK-44815][CONNECT] Cache df.schema to avoid extra RPC
6f87fe2f513d is described below

commit 6f87fe2f513d1b1a022f0d03b6c81d73d7cfb228
Author: Martin Grund <mar...@databricks.com>
AuthorDate: Fri Feb 2 08:49:06 2024 -0400

    [SPARK-44815][CONNECT] Cache df.schema to avoid extra RPC
    
    ### What changes were proposed in this pull request?
    This patch caches the result of the `df.schema` call in the DataFrame to 
avoid the extra roundtrip to the Spark Connect service to retrieve the columns 
or the schema. Since the Dataframe is immutable, the schema will not change.
    
    ### Why are the changes needed?
    Performance / Stability
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing UT
    
    Closes #42499 from grundprinzip/SPARK-44815.
    
    Lead-authored-by: Martin Grund <mar...@databricks.com>
    Co-authored-by: Herman van Hovell <her...@databricks.com>
    Co-authored-by: Martin Grund <martin.gr...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../jvm/src/main/scala/org/apache/spark/sql/Dataset.scala   | 13 ++++++++++++-
 python/pyspark/sql/connect/dataframe.py                     | 12 ++++++++++--
 2 files changed, 22 insertions(+), 3 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 949f53409386..9a42afebf8f2 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -244,7 +244,18 @@ class Dataset[T] private[sql] (
    * @group basic
    * @since 3.4.0
    */
-  def schema: StructType = {
+  def schema: StructType = cachedSchema
+
+  /**
+   * The cached schema.
+   *
+   * Schema caching is correct in most cases. Connect is lazy by nature. This 
means that we only
+   * resolve the plan when it is submitted for execution or analysis. We do 
not cache intermediate
+   * resolved plans. If the input (changes table, view redefinition, etc...) 
of the plan changes
+   * between the schema() call, and a subsequent action, the cached schema 
might be inconsistent
+   * with the end schema.
+   */
+  private lazy val cachedSchema: StructType = {
     DataTypeProtoConverter
       .toCatalystType(
         sparkSession
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 6d37158142a6..4091016e0d59 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -120,6 +120,7 @@ class DataFrame:
         # Check whether _repr_html is supported or not, we use it to avoid 
calling RPC twice
         # by __repr__ and _repr_html_ while eager evaluation opens.
         self._support_repr_html = False
+        self._cached_schema: Optional[StructType] = None
 
     def __repr__(self) -> str:
         if not self._support_repr_html:
@@ -1782,8 +1783,15 @@ class DataFrame:
 
     @property
     def schema(self) -> StructType:
-        query = self._plan.to_proto(self._session.client)
-        return self._session.client.schema(query)
+        # Schema caching is correct in most cases. Connect is lazy by nature. 
This means that
+        # we only resolve the plan when it is submitted for execution or 
analysis. We do not
+        # cache intermediate resolved plan. If the input (changes table, view 
redefinition,
+        # etc...) of the plan changes between the schema() call, and a 
subsequent action, the
+        # cached schema might be inconsistent with the end schema.
+        if self._cached_schema is None:
+            query = self._plan.to_proto(self._session.client)
+            self._cached_schema = self._session.client.schema(query)
+        return self._cached_schema
 
     schema.__doc__ = PySparkDataFrame.schema.__doc__
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to