This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 52fe51b6e9b3 [SPARK-54112][CONNECT] Support getSchemas for 
SparkConnectDatabaseMetaData
52fe51b6e9b3 is described below

commit 52fe51b6e9b366f4fdf108a5563a5a284c0bf4da
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Nov 10 10:04:33 2025 -0800

    [SPARK-54112][CONNECT] Support getSchemas for SparkConnectDatabaseMetaData
    
    ### What changes were proposed in this pull request?
    
    Implement `getSchemas` methods defined in `java.sql.DatabaseMetaData` for 
`SparkConnectDatabaseMetaData`.
    
    ```java
        /**
         * Retrieves the schema names available in this database.  The results
         * are ordered by {code TABLE_CATALOG} and
         * {code TABLE_SCHEM}.
         *
         * <P>The schema columns are:
         *  <OL>
         *  <LI><B>TABLE_SCHEM</B> String {code =>} schema name
         *  <LI><B>TABLE_CATALOG</B> String {code =>} catalog name (may be 
{code null})
         *  </OL>
         *
         * return a {code ResultSet} object in which each row is a
         *         schema description
         * throws SQLException if a database access error occurs
         *
         */
        ResultSet getSchemas() throws SQLException;
    
        /**
         * Retrieves the schema names available in this database.  The results
         * are ordered by {code TABLE_CATALOG} and
         * {code TABLE_SCHEM}.
         *
         * <P>The schema columns are:
         *  <OL>
         *  <LI><B>TABLE_SCHEM</B> String {code =>} schema name
         *  <LI><B>TABLE_CATALOG</B> String {code =>} catalog name (may be 
{code null})
         *  </OL>
         *
         *
         * param catalog a catalog name; must match the catalog name as it is 
stored
         * in the database;"" retrieves those without a catalog; null means 
catalog
         * name should not be used to narrow down the search.
         * param schemaPattern a schema name; must match the schema name as it 
is
         * stored in the database; null means
         * schema name should not be used to narrow down the search.
         * return a {code ResultSet} object in which each row is a
         *         schema description
         * throws SQLException if a database access error occurs
         * see #getSearchStringEscape
         * since 1.6
         */
        ResultSet getSchemas(String catalog, String schemaPattern) throws 
SQLException;
    ```
    
    ### Why are the changes needed?
    
    Enhance API coverage of the Connect JDBC driver, for example, 
`get[Catalogs|Schemas|Tables|...]` APIs are used by SQL GUI tools such as 
DBeaver for displaying the tree category.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, Connect JDBC driver is a new feature under development.
    
    ### How was this patch tested?
    
    New UT is added, also tested via DBeaver - the catalog/schema tree works 
now.
    
    <img width="1260" height="892" alt="Xnip2025-11-01_01-33-38" 
src="https://github.com/user-attachments/assets/ca678627-e07c-430a-9750-e7ea1d69aecf";
 />
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52819 from pan3793/SPARK-54112.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../client/jdbc/SparkConnectDatabaseMetaData.scala |  67 +++++++++++--
 .../jdbc/SparkConnectDatabaseMetaDataSuite.scala   | 108 +++++++++++++++++++++
 .../apache/spark/sql/connect/test/SQLHelper.scala  |  12 +++
 3 files changed, 181 insertions(+), 6 deletions(-)

diff --git 
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaData.scala
 
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaData.scala
index 215c8256acbc..13dd4d57662b 100644
--- 
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaData.scala
+++ 
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaData.scala
@@ -20,7 +20,11 @@ package org.apache.spark.sql.connect.client.jdbc
 import java.sql.{Array => _, _}
 
 import org.apache.spark.SparkBuildInfo.{spark_version => SPARK_VERSION}
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.catalyst.util.QuotingUtils._
+import org.apache.spark.sql.connect
 import org.apache.spark.sql.connect.client.jdbc.SparkConnectDatabaseMetaData._
+import org.apache.spark.sql.functions._
 import org.apache.spark.util.VersionUtils
 
 class SparkConnectDatabaseMetaData(conn: SparkConnectConnection) extends 
DatabaseMetaData {
@@ -97,8 +101,7 @@ class SparkConnectDatabaseMetaData(conn: 
SparkConnectConnection) extends Databas
   override def getTimeDateFunctions: String =
     throw new SQLFeatureNotSupportedException
 
-  override def getSearchStringEscape: String =
-    throw new SQLFeatureNotSupportedException
+  override def getSearchStringEscape: String = "\\"
 
   override def getExtraNameCharacters: String = ""
 
@@ -277,6 +280,9 @@ class SparkConnectDatabaseMetaData(conn: 
SparkConnectConnection) extends Databas
 
   override def dataDefinitionIgnoredInTransactions: Boolean = false
 
+  private def isNullOrWildcard(pattern: String): Boolean =
+    pattern == null || pattern == "%"
+
   override def getProcedures(
       catalog: String,
       schemaPattern: String,
@@ -299,11 +305,60 @@ class SparkConnectDatabaseMetaData(conn: 
SparkConnectConnection) extends Databas
     new SparkConnectResultSet(df.collectResult())
   }
 
-  override def getSchemas: ResultSet =
-    throw new SQLFeatureNotSupportedException
+  override def getSchemas: ResultSet = {
+    conn.checkOpen()
 
-  override def getSchemas(catalog: String, schemaPattern: String): ResultSet =
-    throw new SQLFeatureNotSupportedException
+    getSchemas(null, null)
+  }
+
+  // Schema of the returned DataFrame is:
+  // |-- TABLE_SCHEM: string (nullable = false)
+  // |-- TABLE_CATALOG: string (nullable = false)
+  private def getSchemasDataFrame(
+      catalog: String, schemaPattern: String): connect.DataFrame = {
+
+    val schemaFilterExpr = if (isNullOrWildcard(schemaPattern)) {
+      lit(true)
+    } else {
+      $"TABLE_SCHEM".like(schemaPattern)
+    }
+
+    def internalGetSchemas(
+        catalogOpt: Option[String],
+        schemaFilterExpr: Column): connect.DataFrame = {
+      val catalog = catalogOpt.getOrElse(conn.getCatalog)
+      // Spark SQL supports LIKE clause in SHOW SCHEMAS command, but we can't 
use that
+      // because the LIKE pattern does not follow SQL standard.
+      conn.spark.sql(s"SHOW SCHEMAS IN ${quoteIdentifier(catalog)}")
+        .select($"namespace".as("TABLE_SCHEM"))
+        .filter(schemaFilterExpr)
+        .withColumn("TABLE_CATALOG", lit(catalog))
+    }
+
+    if (catalog == null) {
+      // search in all catalogs
+      val emptyDf = conn.spark.emptyDataFrame
+        .withColumn("TABLE_SCHEM", lit(""))
+        .withColumn("TABLE_CATALOG", lit(""))
+      conn.spark.catalog.listCatalogs().collect().map(_.name).map { c =>
+        internalGetSchemas(Some(c), schemaFilterExpr)
+      }.fold(emptyDf) { (l, r) => l.unionAll(r) }
+    } else if (catalog == "") {
+      // search only in current catalog
+      internalGetSchemas(None, schemaFilterExpr)
+    } else {
+      // search in the specific catalog
+      internalGetSchemas(Some(catalog), schemaFilterExpr)
+    }
+  }
+
+  override def getSchemas(catalog: String, schemaPattern: String): ResultSet = 
{
+    conn.checkOpen()
+
+    val df = getSchemasDataFrame(catalog, schemaPattern)
+      .orderBy("TABLE_CATALOG", "TABLE_SCHEM")
+    new SparkConnectResultSet(df.collectResult())
+  }
 
   override def getTableTypes: ResultSet =
     throw new SQLFeatureNotSupportedException
diff --git 
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
 
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
index 42596b56f4c5..c3d891bc38c3 100644
--- 
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
+++ 
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
@@ -69,6 +69,7 @@ class SparkConnectDatabaseMetaDataSuite extends 
ConnectFunSuite with RemoteSpark
       assert(metadata.storesLowerCaseQuotedIdentifiers === false)
       assert(metadata.storesMixedCaseQuotedIdentifiers === false)
       assert(metadata.getIdentifierQuoteString === "`")
+      assert(metadata.getSearchStringEscape === "\\")
       assert(metadata.getExtraNameCharacters === "")
       assert(metadata.supportsAlterTableWithAddColumn === true)
       assert(metadata.supportsAlterTableWithDropColumn === true)
@@ -235,4 +236,111 @@ class SparkConnectDatabaseMetaDataSuite extends 
ConnectFunSuite with RemoteSpark
       }
     }
   }
+
+  test("SparkConnectDatabaseMetaData getSchemas") {
+
+    def verifyGetSchemas(
+        getSchemas: () => ResultSet)(verify: Seq[(String, String)] => Unit): 
Unit = {
+      Using.resource(getSchemas()) { rs =>
+        val catalogDatabases = new Iterator[(String, String)] {
+          def hasNext: Boolean = rs.next()
+          def next(): (String, String) =
+            (rs.getString("TABLE_CATALOG"), rs.getString("TABLE_SCHEM"))
+        }.toSeq
+        verify(catalogDatabases)
+      }
+    }
+
+    withConnection { conn =>
+      implicit val spark: SparkSession = 
conn.asInstanceOf[SparkConnectConnection].spark
+
+      registerCatalog("test`cat", TEST_IN_MEMORY_CATALOG)
+
+      spark.sql("CREATE DATABASE IF NOT EXISTS `test``cat`.t_db1")
+      spark.sql("CREATE DATABASE IF NOT EXISTS `test``cat`.t_db2")
+      spark.sql("CREATE DATABASE IF NOT EXISTS `test``cat`.t_db_")
+
+      spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.db1")
+      spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.db2")
+      spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.test_db3")
+
+      val metadata = conn.getMetaData
+
+      // no need to care about "test`cat" because it is memory based and 
session isolated,
+      // also is inaccessible from another SparkSession
+      withDatabase("spark_catalog.db1", "spark_catalog.db2", 
"spark_catalog.test_db3") {
+        // list schemas in all catalogs
+        val getSchemasInAllCatalogs = (() => metadata.getSchemas) ::
+          List(null, "%").map { database => () => metadata.getSchemas(null, 
database) } ::: Nil
+
+        getSchemasInAllCatalogs.foreach { getSchemas =>
+          verifyGetSchemas(getSchemas) { catalogDatabases =>
+            // results are ordered by TABLE_CATALOG, TABLE_SCHEM
+            assert {
+              catalogDatabases === Seq(
+                ("spark_catalog", "db1"),
+                ("spark_catalog", "db2"),
+                ("spark_catalog", "default"),
+                ("spark_catalog", "test_db3"),
+                ("test`cat", "t_db1"),
+                ("test`cat", "t_db2"),
+                ("test`cat", "t_db_"))
+            }
+          }
+        }
+
+        // list schemas in current catalog
+        assert(conn.getCatalog === "spark_catalog")
+        val getSchemasInCurrentCatalog =
+          List(null, "%").map { database => () => metadata.getSchemas("", 
database) }
+        getSchemasInCurrentCatalog.foreach { getSchemas =>
+          verifyGetSchemas(getSchemas) { catalogDatabases =>
+            // results are ordered by TABLE_CATALOG, TABLE_SCHEM
+            assert {
+              catalogDatabases === Seq(
+                ("spark_catalog", "db1"),
+                ("spark_catalog", "db2"),
+                ("spark_catalog", "default"),
+                ("spark_catalog", "test_db3"))
+            }
+          }
+        }
+
+        // list schemas with schema pattern
+        verifyGetSchemas { () => metadata.getSchemas(null, "db%") } { 
catalogDatabases =>
+          // results are ordered by TABLE_CATALOG, TABLE_SCHEM
+          assert {
+            catalogDatabases === Seq(
+              ("spark_catalog", "db1"),
+              ("spark_catalog", "db2"))
+          }
+        }
+
+        verifyGetSchemas { () => metadata.getSchemas(null, "db_") } { 
catalogDatabases =>
+          // results are ordered by TABLE_CATALOG, TABLE_SCHEM
+          assert {
+            catalogDatabases === Seq(
+              ("spark_catalog", "db1"),
+              ("spark_catalog", "db2"))
+          }
+        }
+
+        // escape backtick in catalog, and _ in schema pattern
+        verifyGetSchemas {
+          () => metadata.getSchemas("test`cat", "t\\_db\\_")
+        } { catalogDatabases =>
+          assert(catalogDatabases === Seq(("test`cat", "t_db_")))
+        }
+
+        // skip testing escape ', % in schema pattern, because Spark SQL does 
not
+        // allow using those chars in schema table name.
+        //
+        //   CREATE DATABASE IF NOT EXISTS `t_db1'`;
+        //
+        // the above SQL fails with error condition:
+        //   [INVALID_SCHEMA_OR_RELATION_NAME] `t_db1'` is not a valid name 
for tables/schemas.
+        //   Valid names only contain alphabet characters, numbers and _. 
SQLSTATE: 42602
+      }
+    }
+  }
 }
diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/SQLHelper.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/SQLHelper.scala
index b8d1062c3b3b..731550363fc0 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/SQLHelper.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/SQLHelper.scala
@@ -142,4 +142,16 @@ trait SQLHelper {
       spark.sql(s"DROP VIEW IF EXISTS $name")
     })
   }
+
+  /**
+   * Drops database `dbName` after calling `f`.
+   */
+  protected def withDatabase(dbNames: String*)(f: => Unit): Unit = {
+    SparkErrorUtils.tryWithSafeFinally(f) {
+      dbNames.foreach { name =>
+        spark.sql(s"DROP DATABASE IF EXISTS $name CASCADE")
+      }
+      spark.sql(s"USE default")
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to