This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b2d249b1aa2 [SPARK-39555][PYTHON] Make createTable and listTables in the python side support 3-layer-namespace b2d249b1aa2 is described below commit b2d249b1aa2a6e9ef573a542c8585a736b2873c6 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Fri Jun 24 09:34:02 2022 +0900 [SPARK-39555][PYTHON] Make createTable and listTables in the python side support 3-layer-namespace ### What changes were proposed in this pull request? Corresponding changes in the python side of [SPARK-39236](https://issues.apache.org/jira/browse/SPARK-39236) (Make CreateTable API and ListTables API compatible ) ### Why are the changes needed? to support 3-layer-namespace in the python side ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? 1, for existing UT, update them to cover original cases; 2, for 3-layer-namespace, maually test: - 2.1 move all `InMemoryCatalog` related files from `test` to `main`, by `mv sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemory*.scala sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/` - 2.2 fix a few conflicts - 2.3 compile and then maually test in the pyspark repl, for example: ```python spark.sql("CREATE DATABASE my_db") spark.createDataFrame([(1, 1)]).createOrReplaceTempView("temp_tab") spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING parquet") spark.sql("CREATE TABLE my_db.tab2 (name STRING, age INT) USING parquet") schema = StructType([StructField("a", IntegerType(), True)]) description = "this is a test table" spark.conf.set("spark.sql.catalog.testcat", "org.apache.spark.sql.connector.catalog.InMemoryCatalog") spark.catalog.createTable("testcat.my_db.my_table", source="json", schema=schema, description=description) In [2]: spark.catalog.listTables() Out[2]: [Table(name='tab1', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False), Table(name='temp_tab', catalog='spark_catalog', namespace=None, description=None, tableType='TEMPORARY', isTemporary=True)] In [3]: spark.catalog.listTables("my_db") Out[3]: [Table(name='tab2', catalog='spark_catalog', namespace=['my_db'], description=None, tableType='MANAGED', isTemporary=False), Table(name='temp_tab', catalog='spark_catalog', namespace=None, description=None, tableType='TEMPORARY', isTemporary=True)] In [4]: spark.catalog.listTables("testcat.my_db") Out[4]: [Table(name='my_table', catalog='testcat', namespace=['my_db'], description='this is a test table', tableType='MANAGED', isTemporary=False)] ``` Closes #36957 from zhengruifeng/py_table_create_list. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/catalog.py | 26 ++++++- python/pyspark/sql/tests/test_catalog.py | 128 ++++++++++++++++++++----------- 2 files changed, 107 insertions(+), 47 deletions(-) diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index b954995f857..cd16be7ba19 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -37,11 +37,19 @@ class Database(NamedTuple): class Table(NamedTuple): name: str - database: Optional[str] + catalog: Optional[str] + namespace: Optional[List[str]] description: Optional[str] tableType: str isTemporary: bool + @property + def database(self) -> Optional[str]: + if self.namespace is not None and len(self.namespace) == 1: + return self.namespace[0] + else: + return None + class Column(NamedTuple): name: str @@ -127,6 +135,9 @@ class Catalog: If no database is specified, the current database is used. This includes all temporary views. + + .. versionchanged:: 3.4 + Allowed ``dbName`` to be qualified with catalog name. """ if dbName is None: dbName = self.currentDatabase() @@ -134,10 +145,18 @@ class Catalog: tables = [] while iter.hasNext(): jtable = iter.next() + + jnamespace = jtable.namespace() + if jnamespace is not None: + namespace = [jnamespace[i] for i in range(0, len(jnamespace))] + else: + namespace = None + tables.append( Table( name=jtable.name(), - database=jtable.database(), + catalog=jtable.catalog(), + namespace=namespace, description=jtable.description(), tableType=jtable.tableType(), isTemporary=jtable.isTemporary(), @@ -341,6 +360,9 @@ class Catalog: .. versionchanged:: 3.1 Added the ``description`` parameter. + + .. versionchanged:: 3.4 + Allowed ``tableName`` to be qualified with catalog name. """ if path is not None: options["path"] = path diff --git a/python/pyspark/sql/tests/test_catalog.py b/python/pyspark/sql/tests/test_catalog.py index 2254261263f..b2bf83d7a0d 100644 --- a/python/pyspark/sql/tests/test_catalog.py +++ b/python/pyspark/sql/tests/test_catalog.py @@ -79,55 +79,93 @@ class CatalogTests(ReusedSQLTestCase): self.assertEqual(tables, tablesDefault) self.assertEqual(len(tables), 3) self.assertEqual(len(tablesSomeDb), 2) - self.assertEqual( - tables[0], - Table( - name="tab1", - database="default", - description=None, - tableType="MANAGED", - isTemporary=False, - ), - ) - self.assertEqual( - tables[1], - Table( - name="tab3_via_catalog", - database="default", + + # make table in old fashion + def makeTable( + name, + database, + description, + tableType, + isTemporary, + ): + return Table( + name=name, + catalog=None, + namespace=[database] if database is not None else None, description=description, - tableType="MANAGED", - isTemporary=False, - ), + tableType=tableType, + isTemporary=isTemporary, + ) + + # compare tables in old fashion + def compareTables(t1, t2): + return ( + t1.name == t2.name + and t1.database == t2.database + and t1.description == t2.description + and t1.tableType == t2.tableType + and t1.isTemporary == t2.isTemporary + ) + + self.assertTrue( + compareTables( + tables[0], + makeTable( + name="tab1", + database="default", + description=None, + tableType="MANAGED", + isTemporary=False, + ), + ) + ) + self.assertTrue( + compareTables( + tables[1], + makeTable( + name="tab3_via_catalog", + database="default", + description=description, + tableType="MANAGED", + isTemporary=False, + ), + ) ) - self.assertEqual( - tables[2], - Table( - name="temp_tab", - database=None, - description=None, - tableType="TEMPORARY", - isTemporary=True, - ), + self.assertTrue( + compareTables( + tables[2], + makeTable( + name="temp_tab", + database=None, + description=None, + tableType="TEMPORARY", + isTemporary=True, + ), + ) ) - self.assertEqual( - tablesSomeDb[0], - Table( - name="tab2", - database="some_db", - description=None, - tableType="MANAGED", - isTemporary=False, - ), + self.assertTrue( + compareTables( + tablesSomeDb[0], + makeTable( + name="tab2", + database="some_db", + description=None, + tableType="MANAGED", + isTemporary=False, + ), + ) ) - self.assertEqual( - tablesSomeDb[1], - Table( - name="temp_tab", - database=None, - description=None, - tableType="TEMPORARY", - isTemporary=True, - ), + self.assertTrue( + compareTables( + tablesSomeDb[1], + makeTable( + name="temp_tab", + database=None, + description=None, + tableType="TEMPORARY", + isTemporary=True, + ), + ) ) self.assertRaisesRegex( AnalysisException, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org