This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6f87fe2f513d [SPARK-44815][CONNECT] Cache df.schema to avoid extra RPC 6f87fe2f513d is described below commit 6f87fe2f513d1b1a022f0d03b6c81d73d7cfb228 Author: Martin Grund <mar...@databricks.com> AuthorDate: Fri Feb 2 08:49:06 2024 -0400 [SPARK-44815][CONNECT] Cache df.schema to avoid extra RPC ### What changes were proposed in this pull request? This patch caches the result of the `df.schema` call in the DataFrame to avoid the extra roundtrip to the Spark Connect service to retrieve the columns or the schema. Since the Dataframe is immutable, the schema will not change. ### Why are the changes needed? Performance / Stability ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UT Closes #42499 from grundprinzip/SPARK-44815. Lead-authored-by: Martin Grund <mar...@databricks.com> Co-authored-by: Herman van Hovell <her...@databricks.com> Co-authored-by: Martin Grund <martin.gr...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../jvm/src/main/scala/org/apache/spark/sql/Dataset.scala | 13 ++++++++++++- python/pyspark/sql/connect/dataframe.py | 12 ++++++++++-- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index 949f53409386..9a42afebf8f2 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -244,7 +244,18 @@ class Dataset[T] private[sql] ( * @group basic * @since 3.4.0 */ - def schema: StructType = { + def schema: StructType = cachedSchema + + /** + * The cached schema. + * + * Schema caching is correct in most cases. Connect is lazy by nature. This means that we only + * resolve the plan when it is submitted for execution or analysis. We do not cache intermediate + * resolved plans. If the input (changes table, view redefinition, etc...) of the plan changes + * between the schema() call, and a subsequent action, the cached schema might be inconsistent + * with the end schema. + */ + private lazy val cachedSchema: StructType = { DataTypeProtoConverter .toCatalystType( sparkSession diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 6d37158142a6..4091016e0d59 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -120,6 +120,7 @@ class DataFrame: # Check whether _repr_html is supported or not, we use it to avoid calling RPC twice # by __repr__ and _repr_html_ while eager evaluation opens. self._support_repr_html = False + self._cached_schema: Optional[StructType] = None def __repr__(self) -> str: if not self._support_repr_html: @@ -1782,8 +1783,15 @@ class DataFrame: @property def schema(self) -> StructType: - query = self._plan.to_proto(self._session.client) - return self._session.client.schema(query) + # Schema caching is correct in most cases. Connect is lazy by nature. This means that + # we only resolve the plan when it is submitted for execution or analysis. We do not + # cache intermediate resolved plan. If the input (changes table, view redefinition, + # etc...) of the plan changes between the schema() call, and a subsequent action, the + # cached schema might be inconsistent with the end schema. + if self._cached_schema is None: + query = self._plan.to_proto(self._session.client) + self._cached_schema = self._session.client.schema(query) + return self._cached_schema schema.__doc__ = PySparkDataFrame.schema.__doc__ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org