This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 003e44fe309 [SPARK-41716][CONNECT] Rename _catalog_to_pandas to _execute_and_fetch in Catalog 003e44fe309 is described below commit 003e44fe309cae0ed9a787b7bb581a1739230fab Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Tue Feb 7 00:33:57 2023 -0800 [SPARK-41716][CONNECT] Rename _catalog_to_pandas to _execute_and_fetch in Catalog ### What changes were proposed in this pull request? This PR proposes to rename `_catalog_to_pandas` to `_execute_and_fetch`, and remove the JIRA in the comment. ### Why are the changes needed? I tried to factor this out to the client but I realised that there won't be too much difference. In addition, we need a new design for Catalog API so I proposed to remove the JIRA for now. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing test cases should cover. Closes #39920 from HyukjinKwon/SPARK-41716-1. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit b30ba716131eec19bf07c1c47f73e9b3e87c93e4) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- python/pyspark/sql/connect/catalog.py | 51 +++++++++++++++++------------------ 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/python/pyspark/sql/connect/catalog.py b/python/pyspark/sql/connect/catalog.py index 753b00755ad..b7ea44e831e 100644 --- a/python/pyspark/sql/connect/catalog.py +++ b/python/pyspark/sql/connect/catalog.py @@ -42,26 +42,25 @@ class Catalog: def __init__(self, sparkSession: "SparkSession") -> None: self._sparkSession = sparkSession - # TODO(SPARK-41716): Probably should factor out to pyspark.sql.connect.client. - def _catalog_to_pandas(self, catalog: plan.LogicalPlan) -> pd.DataFrame: + def _execute_and_fetch(self, catalog: plan.LogicalPlan) -> pd.DataFrame: pdf = DataFrame.withPlan(catalog, session=self._sparkSession).toPandas() assert pdf is not None return pdf def currentCatalog(self) -> str: - pdf = self._catalog_to_pandas(plan.CurrentCatalog()) + pdf = self._execute_and_fetch(plan.CurrentCatalog()) assert pdf is not None return pdf.iloc[0].iloc[0] currentCatalog.__doc__ = PySparkCatalog.currentCatalog.__doc__ def setCurrentCatalog(self, catalogName: str) -> None: - self._catalog_to_pandas(plan.SetCurrentCatalog(catalog_name=catalogName)) + self._execute_and_fetch(plan.SetCurrentCatalog(catalog_name=catalogName)) setCurrentCatalog.__doc__ = PySparkCatalog.setCurrentCatalog.__doc__ def listCatalogs(self) -> List[CatalogMetadata]: - pdf = self._catalog_to_pandas(plan.ListCatalogs()) + pdf = self._execute_and_fetch(plan.ListCatalogs()) return [ CatalogMetadata(name=row.iloc[0], description=row.iloc[1]) for _, row in pdf.iterrows() ] @@ -69,19 +68,19 @@ class Catalog: listCatalogs.__doc__ = PySparkCatalog.listCatalogs.__doc__ def currentDatabase(self) -> str: - pdf = self._catalog_to_pandas(plan.CurrentDatabase()) + pdf = self._execute_and_fetch(plan.CurrentDatabase()) assert pdf is not None return pdf.iloc[0].iloc[0] currentDatabase.__doc__ = PySparkCatalog.currentDatabase.__doc__ def setCurrentDatabase(self, dbName: str) -> None: - self._catalog_to_pandas(plan.SetCurrentDatabase(db_name=dbName)) + self._execute_and_fetch(plan.SetCurrentDatabase(db_name=dbName)) setCurrentDatabase.__doc__ = PySparkCatalog.setCurrentDatabase.__doc__ def listDatabases(self) -> List[Database]: - pdf = self._catalog_to_pandas(plan.ListDatabases()) + pdf = self._execute_and_fetch(plan.ListDatabases()) return [ Database( name=row.iloc[0], @@ -95,7 +94,7 @@ class Catalog: listDatabases.__doc__ = PySparkCatalog.listDatabases.__doc__ def getDatabase(self, dbName: str) -> Database: - pdf = self._catalog_to_pandas(plan.GetDatabase(db_name=dbName)) + pdf = self._execute_and_fetch(plan.GetDatabase(db_name=dbName)) assert pdf is not None row = pdf.iloc[0] return Database( @@ -108,14 +107,14 @@ class Catalog: getDatabase.__doc__ = PySparkCatalog.getDatabase.__doc__ def databaseExists(self, dbName: str) -> bool: - pdf = self._catalog_to_pandas(plan.DatabaseExists(db_name=dbName)) + pdf = self._execute_and_fetch(plan.DatabaseExists(db_name=dbName)) assert pdf is not None return pdf.iloc[0].iloc[0] databaseExists.__doc__ = PySparkCatalog.databaseExists.__doc__ def listTables(self, dbName: Optional[str] = None) -> List[Table]: - pdf = self._catalog_to_pandas(plan.ListTables(db_name=dbName)) + pdf = self._execute_and_fetch(plan.ListTables(db_name=dbName)) return [ Table( name=row.iloc[0], @@ -132,7 +131,7 @@ class Catalog: listTables.__doc__ = PySparkCatalog.listTables.__doc__ def getTable(self, tableName: str) -> Table: - pdf = self._catalog_to_pandas(plan.GetTable(table_name=tableName)) + pdf = self._execute_and_fetch(plan.GetTable(table_name=tableName)) assert pdf is not None row = pdf.iloc[0] return Table( @@ -148,7 +147,7 @@ class Catalog: getTable.__doc__ = PySparkCatalog.getTable.__doc__ def listFunctions(self, dbName: Optional[str] = None) -> List[Function]: - pdf = self._catalog_to_pandas(plan.ListFunctions(db_name=dbName)) + pdf = self._execute_and_fetch(plan.ListFunctions(db_name=dbName)) return [ Function( name=row.iloc[0], @@ -165,7 +164,7 @@ class Catalog: listFunctions.__doc__ = PySparkCatalog.listFunctions.__doc__ def functionExists(self, functionName: str, dbName: Optional[str] = None) -> bool: - pdf = self._catalog_to_pandas( + pdf = self._execute_and_fetch( plan.FunctionExists(function_name=functionName, db_name=dbName) ) assert pdf is not None @@ -174,7 +173,7 @@ class Catalog: functionExists.__doc__ = PySparkCatalog.functionExists.__doc__ def getFunction(self, functionName: str) -> Function: - pdf = self._catalog_to_pandas(plan.GetFunction(function_name=functionName)) + pdf = self._execute_and_fetch(plan.GetFunction(function_name=functionName)) assert pdf is not None row = pdf.iloc[0] return Function( @@ -190,7 +189,7 @@ class Catalog: getFunction.__doc__ = PySparkCatalog.getFunction.__doc__ def listColumns(self, tableName: str, dbName: Optional[str] = None) -> List[Column]: - pdf = self._catalog_to_pandas(plan.ListColumns(table_name=tableName, db_name=dbName)) + pdf = self._execute_and_fetch(plan.ListColumns(table_name=tableName, db_name=dbName)) return [ Column( name=row.iloc[0], @@ -206,7 +205,7 @@ class Catalog: listColumns.__doc__ = PySparkCatalog.listColumns.__doc__ def tableExists(self, tableName: str, dbName: Optional[str] = None) -> bool: - pdf = self._catalog_to_pandas(plan.TableExists(table_name=tableName, db_name=dbName)) + pdf = self._execute_and_fetch(plan.TableExists(table_name=tableName, db_name=dbName)) assert pdf is not None return pdf.iloc[0].iloc[0] @@ -257,53 +256,53 @@ class Catalog: createTable.__doc__ = PySparkCatalog.createTable.__doc__ def dropTempView(self, viewName: str) -> bool: - pdf = self._catalog_to_pandas(plan.DropTempView(view_name=viewName)) + pdf = self._execute_and_fetch(plan.DropTempView(view_name=viewName)) assert pdf is not None return pdf.iloc[0].iloc[0] dropTempView.__doc__ = PySparkCatalog.dropTempView.__doc__ def dropGlobalTempView(self, viewName: str) -> bool: - pdf = self._catalog_to_pandas(plan.DropGlobalTempView(view_name=viewName)) + pdf = self._execute_and_fetch(plan.DropGlobalTempView(view_name=viewName)) assert pdf is not None return pdf.iloc[0].iloc[0] dropGlobalTempView.__doc__ = PySparkCatalog.dropGlobalTempView.__doc__ def isCached(self, tableName: str) -> bool: - pdf = self._catalog_to_pandas(plan.IsCached(table_name=tableName)) + pdf = self._execute_and_fetch(plan.IsCached(table_name=tableName)) assert pdf is not None return pdf.iloc[0].iloc[0] isCached.__doc__ = PySparkCatalog.isCached.__doc__ def cacheTable(self, tableName: str) -> None: - self._catalog_to_pandas(plan.CacheTable(table_name=tableName)) + self._execute_and_fetch(plan.CacheTable(table_name=tableName)) cacheTable.__doc__ = PySparkCatalog.cacheTable.__doc__ def uncacheTable(self, tableName: str) -> None: - self._catalog_to_pandas(plan.UncacheTable(table_name=tableName)) + self._execute_and_fetch(plan.UncacheTable(table_name=tableName)) uncacheTable.__doc__ = PySparkCatalog.uncacheTable.__doc__ def clearCache(self) -> None: - self._catalog_to_pandas(plan.ClearCache()) + self._execute_and_fetch(plan.ClearCache()) clearCache.__doc__ = PySparkCatalog.clearCache.__doc__ def refreshTable(self, tableName: str) -> None: - self._catalog_to_pandas(plan.RefreshTable(table_name=tableName)) + self._execute_and_fetch(plan.RefreshTable(table_name=tableName)) refreshTable.__doc__ = PySparkCatalog.refreshTable.__doc__ def recoverPartitions(self, tableName: str) -> None: - self._catalog_to_pandas(plan.RecoverPartitions(table_name=tableName)) + self._execute_and_fetch(plan.RecoverPartitions(table_name=tableName)) recoverPartitions.__doc__ = PySparkCatalog.recoverPartitions.__doc__ def refreshByPath(self, path: str) -> None: - self._catalog_to_pandas(plan.RefreshByPath(path=path)) + self._execute_and_fetch(plan.RefreshByPath(path=path)) refreshByPath.__doc__ = PySparkCatalog.refreshByPath.__doc__ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org