This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new e56c1cf8c1 Add new SnowflakeCatalog implementation to enable directly 
using Snowflake-managed Iceberg tables (#6428)
e56c1cf8c1 is described below

commit e56c1cf8c16f527bf91b74399602138e215f4186
Author: Dennis Huo <[email protected]>
AuthorDate: Fri Jan 13 19:57:46 2023 -0800

    Add new SnowflakeCatalog implementation to enable directly using 
Snowflake-managed Iceberg tables (#6428)
    
    * Initial read-only Snowflake Catalog implementation by @sfc-gh-mparmar (#1)
    
    Initial read-only Snowflake Catalog implementation built on top of the 
Snowflake JDBC driver,
    providing support for basic listing of namespaces, listing of tables, and 
loading/reads of tables.
    
    Auth options are passthrough to the JDBC driver.
    
    Co-authored-by: Maninder Parmar <[email protected]>
    Co-authored-by: Maninder Parmar <[email protected]>
    Co-authored-by: Dennis Huo <[email protected]>
    
    * Add JdbcSnowflakeClientTest using mocks (#2)
    
    Add JdbcSnowflakeClientTest using mocks; provides full coverage of 
JdbcSnowflakeClient
    and entities' ResultSetHandler logic.
    
    Also update target Spark runtime versions to be included.
    
    * Add test { useJUnitPlatform() } tuple to iceberg-snowflake for
    consistency and future interoperability with inheriting from abstact
    unittest base classes.
    
    * Extract versions into versions.props per PR review
    
    * Misc test-related refactors per review suggestions
    -Convert unittests to all use assertj/Assertions for "fluent assertions"
    -Refactor test injection into overloaded initialize() method
    -Add test cases for close() propagation
    -Use CloseableGroup.
    
    * Fix unsupported behaviors of loadNamedpaceMetadata and 
defaultWarehouseLocation
    
    * Move TableIdentifier checks out of newTableOps into the
    SnowflakTableOperations class itself, add test case.
    
    * Refactor out any Namespace-related business logic from the lower
    SnowflakeClient/JdbcSnowflakeClient layers and merge SnowflakeTable
    and SnowflakeSchema into a single SnowflakeIdentifier that also
    encompasses ROOT and DATABASE level identifiers.
    
    A SnowflakeIdentifier thus functions like a type-checked/constrained
    Iceberg TableIdentifier, and eliminates any tight coupling between
    a SnowflakeClient and Catalog business logic.
    
    Parsing of Namespace numerical levels into a SnowflakeIdentifier
    is now fully encapsulated in NamespaceHelpers so that callsites
    don't duplicate namespace-handling/validation logic.
    
    * Finish migrating JdbcSnowflakeClientTest off any usage of org.junit.Assert
    in favor of assertj's Assertions.
    
    * Style refactorings from review comments, expanded and moved 
InMemoryFileIO into core
    with its own unittest.
    
    * Fix behavior of getNamespaceMetadata to throw when the namespace doesn't
    exist.
    
    Refactor for naming conventions and consolidating identifier
    handling into NamespaceHandlers.
    
    Make FileIO instantiated fresh for each newTableOps call.
    
    * Move private constructor to top, add assertion to test case.
    
    * Define minimal ResultSetParser/QueryHarness classes to fully replace
    any use of commons-dbutils; refactor ResultSet handling fully into
    JdbcSnowflakeClient.java.
    
    * Update 
snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableOperations.java
    
    Co-authored-by: Eduard Tudenhöfner <[email protected]>
    
    * Refactor style suggestions; remove debug-level logging, arguments in 
exceptions,
    private members if not accessed outside, move precondition checks, add test 
for
    NamespaceHelpers.
    
    * Fix precondition messages, remove getConf()
    
    * Clean up varargs.
    
    * Make data members final, include rawJsonVal in toString for debuggability.
    
    * Combine some small test cases into roundtrip test cases, misc cleanup
    
    * Add comment for why a factory class is exposed for testing purposes.
    
    Co-authored-by: Dennis Huo <[email protected]>
    Co-authored-by: Maninder Parmar <[email protected]>
    Co-authored-by: Maninder Parmar <[email protected]>
    Co-authored-by: Eduard Tudenhöfner <[email protected]>
---
 .github/labeler.yml                                |   4 +-
 build.gradle                                       |  18 +
 .../org/apache/iceberg/jdbc/JdbcClientPool.java    |   6 +-
 settings.gradle                                    |   2 +
 .../iceberg/snowflake/JdbcSnowflakeClient.java     | 378 +++++++++++++
 .../apache/iceberg/snowflake/NamespaceHelpers.java | 100 ++++
 .../apache/iceberg/snowflake/SnowflakeCatalog.java | 249 +++++++++
 .../apache/iceberg/snowflake/SnowflakeClient.java  |  64 +++
 .../iceberg/snowflake/SnowflakeIdentifier.java     | 133 +++++
 .../iceberg/snowflake/SnowflakeTableMetadata.java  | 150 +++++
 .../snowflake/SnowflakeTableOperations.java        |  98 ++++
 .../iceberg/snowflake/FakeSnowflakeClient.java     | 191 +++++++
 .../iceberg/snowflake/JdbcSnowflakeClientTest.java | 603 +++++++++++++++++++++
 .../iceberg/snowflake/NamespaceHelpersTest.java    | 115 ++++
 .../iceberg/snowflake/SnowflakeCatalogTest.java    | 298 ++++++++++
 spark/v3.1/build.gradle                            |   3 +
 spark/v3.2/build.gradle                            |   3 +
 spark/v3.3/build.gradle                            |   3 +
 versions.props                                     |   1 +
 19 files changed, 2415 insertions(+), 4 deletions(-)

diff --git a/.github/labeler.yml b/.github/labeler.yml
index 521e1a42aa..c623fbc6dd 100644
--- a/.github/labeler.yml
+++ b/.github/labeler.yml
@@ -81,4 +81,6 @@ ALIYUN:
 GCP:
   - gcp/**/*
 DELL:
-  - dell/**/*
\ No newline at end of file
+  - dell/**/*
+SNOWFLAKE:
+  - snowflake/**/*
diff --git a/build.gradle b/build.gradle
index 0fba8cf1a9..7b14f3b731 100644
--- a/build.gradle
+++ b/build.gradle
@@ -697,6 +697,24 @@ project(':iceberg-dell') {
   }
 }
 
+project(':iceberg-snowflake') {
+  test {
+    useJUnitPlatform()
+  }
+
+  dependencies {
+    implementation project(':iceberg-core')
+    implementation project(':iceberg-common')
+    implementation project(path: ':iceberg-bundled-guava', configuration: 
'shadow')
+    implementation "com.fasterxml.jackson.core:jackson-databind"
+    implementation "com.fasterxml.jackson.core:jackson-core"
+
+    runtimeOnly("net.snowflake:snowflake-jdbc")
+
+    testImplementation project(path: ':iceberg-core', configuration: 
'testArtifacts')
+  }
+}
+
 @Memoized
 boolean versionFileExists() {
   return file('version.txt').exists()
diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java 
b/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
index daa04908f4..60e5eb49a4 100644
--- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
+++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
@@ -27,12 +27,12 @@ import java.util.Properties;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.ClientPoolImpl;
 
-class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> {
+public class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> {
 
   private final String dbUrl;
   private final Map<String, String> properties;
 
-  JdbcClientPool(String dbUrl, Map<String, String> props) {
+  public JdbcClientPool(String dbUrl, Map<String, String> props) {
     this(
         Integer.parseInt(
             props.getOrDefault(
@@ -42,7 +42,7 @@ class JdbcClientPool extends ClientPoolImpl<Connection, 
SQLException> {
         props);
   }
 
-  JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props) {
+  public JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props) 
{
     super(poolSize, SQLNonTransientConnectionException.class, true);
     properties = props;
     this.dbUrl = dbUrl;
diff --git a/settings.gradle b/settings.gradle
index d1a14abe5b..c5ac07e080 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -34,6 +34,7 @@ include 'hive-metastore'
 include 'nessie'
 include 'gcp'
 include 'dell'
+include 'snowflake'
 
 project(':api').name = 'iceberg-api'
 project(':common').name = 'iceberg-common'
@@ -51,6 +52,7 @@ project(':hive-metastore').name = 'iceberg-hive-metastore'
 project(':nessie').name = 'iceberg-nessie'
 project(':gcp').name = 'iceberg-gcp'
 project(':dell').name = 'iceberg-dell'
+project(':snowflake').name = 'iceberg-snowflake'
 
 if (null != System.getProperty("allVersions")) {
   System.setProperty("flinkVersions", System.getProperty("knownFlinkVersions"))
diff --git 
a/snowflake/src/main/java/org/apache/iceberg/snowflake/JdbcSnowflakeClient.java 
b/snowflake/src/main/java/org/apache/iceberg/snowflake/JdbcSnowflakeClient.java
new file mode 100644
index 0000000000..1618f76c10
--- /dev/null
+++ 
b/snowflake/src/main/java/org/apache/iceberg/snowflake/JdbcSnowflakeClient.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.iceberg.jdbc.JdbcClientPool;
+import org.apache.iceberg.jdbc.UncheckedInterruptedException;
+import org.apache.iceberg.jdbc.UncheckedSQLException;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * This implementation of SnowflakeClient builds on top of Snowflake's JDBC 
driver to interact with
+ * Snowflake's Iceberg-aware resource model.
+ */
+class JdbcSnowflakeClient implements SnowflakeClient {
+  static final String EXPECTED_JDBC_IMPL = 
"net.snowflake.client.jdbc.SnowflakeDriver";
+
+  @FunctionalInterface
+  interface ResultSetParser<T> {
+    T parse(ResultSet rs) throws SQLException;
+  }
+
+  /**
+   * This class wraps the basic boilerplate of setting up PreparedStatements 
and applying a
+   * ResultSetParser to translate a ResultSet into parsed objects. Allows 
easily injecting
+   * subclasses for debugging/testing purposes.
+   */
+  static class QueryHarness {
+    public <T> T query(Connection conn, String sql, ResultSetParser<T> parser, 
String... args)
+        throws SQLException {
+      try (PreparedStatement statement = conn.prepareStatement(sql)) {
+        if (args != null) {
+          for (int i = 0; i < args.length; ++i) {
+            statement.setString(i + 1, args[i]);
+          }
+        }
+
+        try (ResultSet rs = statement.executeQuery()) {
+          return parser.parse(rs);
+        }
+      }
+    }
+  }
+
+  /**
+   * Expects to handle ResultSets representing fully-qualified Snowflake 
Database identifiers,
+   * containing "name" (representing databaseName).
+   */
+  public static final ResultSetParser<List<SnowflakeIdentifier>> 
DATABASE_RESULT_SET_HANDLER =
+      rs -> {
+        List<SnowflakeIdentifier> databases = Lists.newArrayList();
+        while (rs.next()) {
+          String databaseName = rs.getString("name");
+          databases.add(SnowflakeIdentifier.ofDatabase(databaseName));
+        }
+        return databases;
+      };
+
+  /**
+   * Expects to handle ResultSets representing fully-qualified Snowflake 
Schema identifiers,
+   * containing "database_name" and "name" (representing schemaName).
+   */
+  public static final ResultSetParser<List<SnowflakeIdentifier>> 
SCHEMA_RESULT_SET_HANDLER =
+      rs -> {
+        List<SnowflakeIdentifier> schemas = Lists.newArrayList();
+        while (rs.next()) {
+          String databaseName = rs.getString("database_name");
+          String schemaName = rs.getString("name");
+          schemas.add(SnowflakeIdentifier.ofSchema(databaseName, schemaName));
+        }
+        return schemas;
+      };
+
+  /**
+   * Expects to handle ResultSets representing fully-qualified Snowflake Table 
identifiers,
+   * containing "database_name", "schema_name", and "name" (representing 
tableName).
+   */
+  public static final ResultSetParser<List<SnowflakeIdentifier>> 
TABLE_RESULT_SET_HANDLER =
+      rs -> {
+        List<SnowflakeIdentifier> tables = Lists.newArrayList();
+        while (rs.next()) {
+          String databaseName = rs.getString("database_name");
+          String schemaName = rs.getString("schema_name");
+          String tableName = rs.getString("name");
+          tables.add(SnowflakeIdentifier.ofTable(databaseName, schemaName, 
tableName));
+        }
+        return tables;
+      };
+
+  /**
+   * Expects to handle ResultSets representing a single record holding 
Snowflake Iceberg metadata.
+   */
+  public static final ResultSetParser<SnowflakeTableMetadata> 
TABLE_METADATA_RESULT_SET_HANDLER =
+      rs -> {
+        if (!rs.next()) {
+          return null;
+        }
+
+        String rawJsonVal = rs.getString("METADATA");
+        return SnowflakeTableMetadata.parseJson(rawJsonVal);
+      };
+
+  private final JdbcClientPool connectionPool;
+  private QueryHarness queryHarness;
+
+  JdbcSnowflakeClient(JdbcClientPool conn) {
+    Preconditions.checkArgument(null != conn, "JdbcClientPool must be 
non-null");
+    connectionPool = conn;
+    queryHarness = new QueryHarness();
+  }
+
+  @VisibleForTesting
+  void setQueryHarness(QueryHarness queryHarness) {
+    this.queryHarness = queryHarness;
+  }
+
+  /**
+   * For rare cases where PreparedStatements aren't supported for 
user-supplied identifiers intended
+   * for use in special LIKE clauses, we can sanitize by "broadening" the 
identifier with
+   * single-character wildcards and manually post-filter client-side.
+   *
+   * <p>Note: This sanitization approach intentionally "broadens" the scope of 
matching results;
+   * callers must be able to handle this method returning an all-wildcard 
expression; i.e. the
+   * caller must treat the usage of the LIKE clause as only an optional 
optimization, and should
+   * post-filter for correctness as if the LIKE clause wasn't present in the 
query at all.
+   */
+  @VisibleForTesting
+  String sanitizeIdentifierWithWildcardForLikeClause(String identifier) {
+    // Restrict identifiers to the "Unquoted object identifiers" synax 
documented at
+    // https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html
+    //
+    // Use a strict allowlist of characters, replace everything *not* matching 
the character set
+    // with "_", which is used as a single-character wildcard in Snowflake.
+    String sanitized = identifier.replaceAll("[^a-zA-Z0-9_$]", "_");
+    if (sanitized.startsWith("$")) {
+      sanitized = "_" + sanitized.substring(1);
+    }
+    return sanitized;
+  }
+
+  @Override
+  public boolean databaseExists(SnowflakeIdentifier database) {
+    Preconditions.checkArgument(
+        database.type() == SnowflakeIdentifier.Type.DATABASE,
+        "databaseExists requires a DATABASE identifier, got '%s'",
+        database);
+
+    // Due to current limitations in PreparedStatement parameters for the LIKE 
clause in
+    // SHOW DATABASES queries, we'll use a fairly limited allowlist for 
identifier characters,
+    // using wildcards for non-allowed characters, and post-filter for 
matching.
+    final String finalQuery =
+        String.format(
+            "SHOW DATABASES LIKE '%s' IN ACCOUNT",
+            
sanitizeIdentifierWithWildcardForLikeClause(database.databaseName()));
+    List<SnowflakeIdentifier> databases;
+    try {
+      databases =
+          connectionPool.run(
+              conn -> queryHarness.query(conn, finalQuery, 
DATABASE_RESULT_SET_HANDLER));
+    } catch (SQLException e) {
+      throw new UncheckedSQLException(e, "Failed to check if database '%s' 
exists", database);
+    } catch (InterruptedException e) {
+      throw new UncheckedInterruptedException(
+          e, "Interrupted while checking if database '%s' exists", database);
+    }
+
+    // Filter to handle the edge case of '_' appearing as a wildcard that 
can't be remapped the way
+    // it can for predicates in SELECT statements.
+    databases.removeIf(db -> 
!database.databaseName().equalsIgnoreCase(db.databaseName()));
+    return !databases.isEmpty();
+  }
+
+  @Override
+  public boolean schemaExists(SnowflakeIdentifier schema) {
+    Preconditions.checkArgument(
+        schema.type() == SnowflakeIdentifier.Type.SCHEMA,
+        "schemaExists requires a SCHEMA identifier, got '%s'",
+        schema);
+
+    if 
(!databaseExists(SnowflakeIdentifier.ofDatabase(schema.databaseName()))) {
+      return false;
+    }
+
+    // Due to current limitations in PreparedStatement parameters for the LIKE 
clause in
+    // SHOW SCHEMAS queries, we'll use a fairly limited allowlist for 
identifier characters,
+    // using wildcards for non-allowed characters, and post-filter for 
matching.
+    final String finalQuery =
+        String.format(
+            "SHOW SCHEMAS LIKE '%s' IN DATABASE IDENTIFIER(?)",
+            sanitizeIdentifierWithWildcardForLikeClause(schema.schemaName()));
+    List<SnowflakeIdentifier> schemas;
+    try {
+      schemas =
+          connectionPool.run(
+              conn ->
+                  queryHarness.query(
+                      conn, finalQuery, SCHEMA_RESULT_SET_HANDLER, 
schema.databaseName()));
+    } catch (SQLException e) {
+      throw new UncheckedSQLException(e, "Failed to check if schema '%s' 
exists", schema);
+    } catch (InterruptedException e) {
+      throw new UncheckedInterruptedException(
+          e, "Interrupted while checking if schema '%s' exists", schema);
+    }
+
+    // Filter to handle the edge case of '_' appearing as a wildcard that 
can't be remapped the way
+    // it can for predicates in SELECT statements.
+    schemas.removeIf(sc -> 
!schema.schemaName().equalsIgnoreCase(sc.schemaName()));
+    return !schemas.isEmpty();
+  }
+
+  @Override
+  public List<SnowflakeIdentifier> listDatabases() {
+    List<SnowflakeIdentifier> databases;
+    try {
+      databases =
+          connectionPool.run(
+              conn ->
+                  queryHarness.query(
+                      conn, "SHOW DATABASES IN ACCOUNT", 
DATABASE_RESULT_SET_HANDLER));
+    } catch (SQLException e) {
+      throw new UncheckedSQLException(e, "Failed to list databases");
+    } catch (InterruptedException e) {
+      throw new UncheckedInterruptedException(e, "Interrupted while listing 
databases");
+    }
+    databases.forEach(
+        db ->
+            Preconditions.checkState(
+                db.type() == SnowflakeIdentifier.Type.DATABASE,
+                "Expected DATABASE, got identifier '%s'",
+                db));
+    return databases;
+  }
+
+  @Override
+  public List<SnowflakeIdentifier> listSchemas(SnowflakeIdentifier scope) {
+    StringBuilder baseQuery = new StringBuilder("SHOW SCHEMAS");
+    String[] queryParams = null;
+    switch (scope.type()) {
+      case ROOT:
+        // account-level listing
+        baseQuery.append(" IN ACCOUNT");
+        break;
+      case DATABASE:
+        // database-level listing
+        baseQuery.append(" IN DATABASE IDENTIFIER(?)");
+        queryParams = new String[] {scope.toIdentifierString()};
+        break;
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unsupported scope type for listSchemas: %s", 
scope));
+    }
+
+    final String finalQuery = baseQuery.toString();
+    final String[] finalQueryParams = queryParams;
+    List<SnowflakeIdentifier> schemas;
+    try {
+      schemas =
+          connectionPool.run(
+              conn ->
+                  queryHarness.query(
+                      conn, finalQuery, SCHEMA_RESULT_SET_HANDLER, 
finalQueryParams));
+    } catch (SQLException e) {
+      throw new UncheckedSQLException(e, "Failed to list schemas for scope 
'%s'", scope);
+    } catch (InterruptedException e) {
+      throw new UncheckedInterruptedException(
+          e, "Interrupted while listing schemas for scope '%s'", scope);
+    }
+    schemas.forEach(
+        schema ->
+            Preconditions.checkState(
+                schema.type() == SnowflakeIdentifier.Type.SCHEMA,
+                "Expected SCHEMA, got identifier '%s' for scope '%s'",
+                schema,
+                scope));
+    return schemas;
+  }
+
+  @Override
+  public List<SnowflakeIdentifier> listIcebergTables(SnowflakeIdentifier 
scope) {
+    StringBuilder baseQuery = new StringBuilder("SHOW ICEBERG TABLES");
+    String[] queryParams = null;
+    switch (scope.type()) {
+      case ROOT:
+        // account-level listing
+        baseQuery.append(" IN ACCOUNT");
+        break;
+      case DATABASE:
+        // database-level listing
+        baseQuery.append(" IN DATABASE IDENTIFIER(?)");
+        queryParams = new String[] {scope.toIdentifierString()};
+        break;
+      case SCHEMA:
+        // schema-level listing
+        baseQuery.append(" IN SCHEMA IDENTIFIER(?)");
+        queryParams = new String[] {scope.toIdentifierString()};
+        break;
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unsupported scope type for listIcebergTables: %s", 
scope));
+    }
+
+    final String finalQuery = baseQuery.toString();
+    final String[] finalQueryParams = queryParams;
+    List<SnowflakeIdentifier> tables;
+    try {
+      tables =
+          connectionPool.run(
+              conn ->
+                  queryHarness.query(conn, finalQuery, 
TABLE_RESULT_SET_HANDLER, finalQueryParams));
+    } catch (SQLException e) {
+      throw new UncheckedSQLException(e, "Failed to list tables for scope 
'%s'", scope);
+    } catch (InterruptedException e) {
+      throw new UncheckedInterruptedException(
+          e, "Interrupted while listing tables for scope '%s'", scope);
+    }
+    tables.forEach(
+        table ->
+            Preconditions.checkState(
+                table.type() == SnowflakeIdentifier.Type.TABLE,
+                "Expected TABLE, got identifier '%s' for scope '%s'",
+                table,
+                scope));
+    return tables;
+  }
+
+  @Override
+  public SnowflakeTableMetadata loadTableMetadata(SnowflakeIdentifier 
tableIdentifier) {
+    Preconditions.checkArgument(
+        tableIdentifier.type() == SnowflakeIdentifier.Type.TABLE,
+        "loadTableMetadata requires a TABLE identifier, got '%s'",
+        tableIdentifier);
+    SnowflakeTableMetadata tableMeta;
+    try {
+      final String finalQuery = "SELECT 
SYSTEM$GET_ICEBERG_TABLE_INFORMATION(?) AS METADATA";
+      tableMeta =
+          connectionPool.run(
+              conn ->
+                  queryHarness.query(
+                      conn,
+                      finalQuery,
+                      TABLE_METADATA_RESULT_SET_HANDLER,
+                      tableIdentifier.toIdentifierString()));
+    } catch (SQLException e) {
+      throw new UncheckedSQLException(e, "Failed to get table metadata for 
'%s'", tableIdentifier);
+    } catch (InterruptedException e) {
+      throw new UncheckedInterruptedException(
+          e, "Interrupted while getting table metadata for '%s'", 
tableIdentifier);
+    }
+    return tableMeta;
+  }
+
+  @Override
+  public void close() {
+    connectionPool.close();
+  }
+}
diff --git 
a/snowflake/src/main/java/org/apache/iceberg/snowflake/NamespaceHelpers.java 
b/snowflake/src/main/java/org/apache/iceberg/snowflake/NamespaceHelpers.java
new file mode 100644
index 0000000000..28dacbca98
--- /dev/null
+++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/NamespaceHelpers.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+class NamespaceHelpers {
+  private static final int MAX_NAMESPACE_DEPTH = 2;
+  private static final int NAMESPACE_ROOT_LEVEL = 0;
+  private static final int NAMESPACE_DB_LEVEL = 1;
+  private static final int NAMESPACE_SCHEMA_LEVEL = 2;
+
+  private NamespaceHelpers() {}
+
+  /**
+   * Converts a Namespace into a SnowflakeIdentifier representing ROOT, a 
DATABASE, or a SCHEMA.
+   *
+   * @throws IllegalArgumentException if the namespace is not a supported 
depth.
+   */
+  public static SnowflakeIdentifier toSnowflakeIdentifier(Namespace namespace) 
{
+    switch (namespace.length()) {
+      case NAMESPACE_ROOT_LEVEL:
+        return SnowflakeIdentifier.ofRoot();
+      case NAMESPACE_DB_LEVEL:
+        return 
SnowflakeIdentifier.ofDatabase(namespace.level(NAMESPACE_DB_LEVEL - 1));
+      case NAMESPACE_SCHEMA_LEVEL:
+        return SnowflakeIdentifier.ofSchema(
+            namespace.level(NAMESPACE_DB_LEVEL - 1), 
namespace.level(NAMESPACE_SCHEMA_LEVEL - 1));
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Snowflake max namespace level is %d, got namespace '%s'",
+                MAX_NAMESPACE_DEPTH, namespace));
+    }
+  }
+
+  /**
+   * Converts a TableIdentifier into a SnowflakeIdentifier of type TABLE; the 
identifier must have
+   * exactly the right namespace depth to represent a fully-qualified 
Snowflake table identifier.
+   */
+  public static SnowflakeIdentifier toSnowflakeIdentifier(TableIdentifier 
identifier) {
+    SnowflakeIdentifier namespaceScope = 
toSnowflakeIdentifier(identifier.namespace());
+    Preconditions.checkArgument(
+        namespaceScope.type() == SnowflakeIdentifier.Type.SCHEMA,
+        "Namespace portion of '%s' must be at the SCHEMA level, got 
namespaceScope '%s'",
+        identifier,
+        namespaceScope);
+    return SnowflakeIdentifier.ofTable(
+        namespaceScope.databaseName(), namespaceScope.schemaName(), 
identifier.name());
+  }
+
+  /**
+   * Converts a SnowflakeIdentifier of type ROOT, DATABASE, or SCHEMA into an 
equivalent Iceberg
+   * Namespace; throws IllegalArgumentException if not an appropriate type.
+   */
+  public static Namespace toIcebergNamespace(SnowflakeIdentifier identifier) {
+    switch (identifier.type()) {
+      case ROOT:
+        return Namespace.empty();
+      case DATABASE:
+        return Namespace.of(identifier.databaseName());
+      case SCHEMA:
+        return Namespace.of(identifier.databaseName(), 
identifier.schemaName());
+      default:
+        throw new IllegalArgumentException(
+            String.format("Cannot convert identifier '%s' to Namespace", 
identifier));
+    }
+  }
+
+  /**
+   * Converts a SnowflakeIdentifier to an equivalent Iceberg TableIdentifier; 
the identifier must be
+   * of type TABLE.
+   */
+  public static TableIdentifier toIcebergTableIdentifier(SnowflakeIdentifier 
identifier) {
+    Preconditions.checkArgument(
+        identifier.type() == SnowflakeIdentifier.Type.TABLE,
+        "SnowflakeIdentifier must be type TABLE, got '%s'",
+        identifier);
+    return TableIdentifier.of(
+        identifier.databaseName(), identifier.schemaName(), 
identifier.tableName());
+  }
+}
diff --git 
a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java 
b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java
new file mode 100644
index 0000000000..19302d5784
--- /dev/null
+++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.jdbc.JdbcClientPool;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SnowflakeCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+  private static final String DEFAULT_CATALOG_NAME = "snowflake_catalog";
+  private static final String DEFAULT_FILE_IO_IMPL = 
"org.apache.iceberg.io.ResolvingFileIO";
+
+  // Injectable factory for testing purposes.
+  static class FileIOFactory {
+    public FileIO newFileIO(String impl, Map<String, String> properties, 
Object hadoopConf) {
+      return CatalogUtil.loadFileIO(impl, properties, hadoopConf);
+    }
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SnowflakeCatalog.class);
+
+  private CloseableGroup closeableGroup;
+  private Object conf;
+  private String catalogName;
+  private Map<String, String> catalogProperties;
+  private FileIOFactory fileIOFactory;
+  private SnowflakeClient snowflakeClient;
+
+  public SnowflakeCatalog() {}
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    SnowflakeIdentifier scope = 
NamespaceHelpers.toSnowflakeIdentifier(namespace);
+    Preconditions.checkArgument(
+        scope.type() == SnowflakeIdentifier.Type.SCHEMA,
+        "listTables must be at SCHEMA level; got %s from namespace %s",
+        scope,
+        namespace);
+
+    List<SnowflakeIdentifier> sfTables = 
snowflakeClient.listIcebergTables(scope);
+
+    return sfTables.stream()
+        .map(NamespaceHelpers::toIcebergTableIdentifier)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    throw new UnsupportedOperationException(
+        "SnowflakeCatalog does not currently support dropTable");
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    throw new UnsupportedOperationException(
+        "SnowflakeCatalog does not currently support renameTable");
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    String uri = properties.get(CatalogProperties.URI);
+    Preconditions.checkArgument(null != uri, "JDBC connection URI is 
required");
+    try {
+      // We'll ensure the expected JDBC driver implementation class is 
initialized through
+      // reflection regardless of which classloader ends up using this 
JdbcSnowflakeClient, but
+      // we'll only warn if the expected driver fails to load, since users may 
use repackaged or
+      // custom JDBC drivers for Snowflake communication.
+      Class.forName(JdbcSnowflakeClient.EXPECTED_JDBC_IMPL);
+    } catch (ClassNotFoundException cnfe) {
+      LOG.warn(
+          "Failed to load expected JDBC SnowflakeDriver - if queries fail by 
failing"
+              + " to find a suitable driver for jdbc:snowflake:// URIs, you 
must add the Snowflake "
+              + " JDBC driver to your jars/packages",
+          cnfe);
+    }
+    JdbcClientPool connectionPool = new JdbcClientPool(uri, properties);
+
+    initialize(name, new JdbcSnowflakeClient(connectionPool), new 
FileIOFactory(), properties);
+  }
+
+  /**
+   * Initialize using caller-supplied SnowflakeClient and FileIO.
+   *
+   * @param name The name of the catalog, defaults to "snowflake_catalog"
+   * @param snowflakeClient The client encapsulating network communication 
with Snowflake
+   * @param fileIOFactory The {@link FileIOFactory} to use to instantiate a 
new FileIO for each new
+   *     table operation
+   * @param properties The catalog options to use and propagate to dependencies
+   */
+  @SuppressWarnings("checkstyle:HiddenField")
+  void initialize(
+      String name,
+      SnowflakeClient snowflakeClient,
+      FileIOFactory fileIOFactory,
+      Map<String, String> properties) {
+    Preconditions.checkArgument(null != snowflakeClient, "snowflakeClient must 
be non-null");
+    Preconditions.checkArgument(null != fileIOFactory, "fileIOFactory must be 
non-null");
+    this.catalogName = name == null ? DEFAULT_CATALOG_NAME : name;
+    this.snowflakeClient = snowflakeClient;
+    this.fileIOFactory = fileIOFactory;
+    this.catalogProperties = properties;
+    this.closeableGroup = new CloseableGroup();
+    closeableGroup.addCloseable(snowflakeClient);
+    closeableGroup.setSuppressCloseFailure(true);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (null != closeableGroup) {
+      closeableGroup.close();
+    }
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> 
metadata) {
+    throw new UnsupportedOperationException(
+        "SnowflakeCatalog does not currently support createNamespace");
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) {
+    SnowflakeIdentifier scope = 
NamespaceHelpers.toSnowflakeIdentifier(namespace);
+    List<SnowflakeIdentifier> results = null;
+    switch (scope.type()) {
+      case ROOT:
+        results = snowflakeClient.listDatabases();
+        break;
+      case DATABASE:
+        results = snowflakeClient.listSchemas(scope);
+        break;
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "listNamespaces must be at either ROOT or DATABASE level; got 
%s from namespace %s",
+                scope, namespace));
+    }
+
+    return 
results.stream().map(NamespaceHelpers::toIcebergNamespace).collect(Collectors.toList());
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace)
+      throws NoSuchNamespaceException {
+    SnowflakeIdentifier id = NamespaceHelpers.toSnowflakeIdentifier(namespace);
+    boolean namespaceExists;
+    switch (id.type()) {
+      case DATABASE:
+        namespaceExists = snowflakeClient.databaseExists(id);
+        break;
+      case SCHEMA:
+        namespaceExists = snowflakeClient.schemaExists(id);
+        break;
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "loadNamespaceMetadat must be at either DATABASE or SCHEMA 
level; got %s from namespace %s",
+                id, namespace));
+    }
+    if (namespaceExists) {
+      return ImmutableMap.of();
+    } else {
+      throw new NoSuchNamespaceException(
+          "Namespace '%s' with snowflake identifier '%s' doesn't exist", 
namespace, id);
+    }
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace namespace) {
+    throw new UnsupportedOperationException(
+        "SnowflakeCatalog does not currently support dropNamespace");
+  }
+
+  @Override
+  public boolean setProperties(Namespace namespace, Map<String, String> 
properties) {
+    throw new UnsupportedOperationException(
+        "SnowflakeCatalog does not currently support setProperties");
+  }
+
+  @Override
+  public boolean removeProperties(Namespace namespace, Set<String> properties) 
{
+    throw new UnsupportedOperationException(
+        "SnowflakeCatalog does not currently support removeProperties");
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    String fileIOImpl = DEFAULT_FILE_IO_IMPL;
+    if (catalogProperties.containsKey(CatalogProperties.FILE_IO_IMPL)) {
+      fileIOImpl = catalogProperties.get(CatalogProperties.FILE_IO_IMPL);
+    }
+
+    // Initialize a fresh FileIO for each TableOperations created, because 
some FileIO
+    // implementations such as S3FileIO can become bound to a single S3 
bucket. Additionally,
+    // FileIO implementations often support only a finite set of one or more 
URI schemes (i.e.
+    // S3FileIO only supports s3/s3a/s3n, and even ResolvingFileIO only 
supports the combination
+    // of schemes registered for S3FileIO and HadoopFileIO). Individual 
catalogs may need to
+    // support tables across different cloud/storage providers with disjoint 
FileIO implementations.
+    FileIO fileIO = fileIOFactory.newFileIO(fileIOImpl, catalogProperties, 
conf);
+    closeableGroup.addCloseable(fileIO);
+    return new SnowflakeTableOperations(snowflakeClient, fileIO, catalogName, 
tableIdentifier);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    throw new UnsupportedOperationException(
+        "SnowflakeCatalog does not currently support 
defaultWarehouseLocation");
+  }
+
+  @Override
+  public void setConf(Object conf) {
+    this.conf = conf;
+  }
+}
diff --git 
a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeClient.java 
b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeClient.java
new file mode 100644
index 0000000000..2dfadb9a65
--- /dev/null
+++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeClient.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.io.Closeable;
+import java.util.List;
+
+/**
+ * This interface abstracts out the underlying communication protocols for 
contacting Snowflake to
+ * obtain the various resource representations defined under "entities". 
Classes using this
+ * interface should minimize assumptions about whether an underlying client 
uses e.g. REST, JDBC or
+ * other underlying libraries/protocols.
+ */
+interface SnowflakeClient extends Closeable {
+
+  /** Returns true if the database exists, false otherwise. */
+  boolean databaseExists(SnowflakeIdentifier database);
+
+  /** Returns true if the schema and its parent database exists, false 
otherwise. */
+  boolean schemaExists(SnowflakeIdentifier schema);
+
+  /** Lists all Snowflake databases within the currently configured account. */
+  List<SnowflakeIdentifier> listDatabases();
+
+  /**
+   * Lists all Snowflake schemas within a given scope. Returned 
SnowflakeIdentifiers must have
+   * type() == SnowflakeIdentifier.Type.SCHEMA.
+   *
+   * @param scope The scope in which to list, which may be ROOT or a single 
DATABASE.
+   */
+  List<SnowflakeIdentifier> listSchemas(SnowflakeIdentifier scope);
+
+  /**
+   * Lists all Snowflake Iceberg tables within a given scope. Returned 
SnowflakeIdentifiers must
+   * have type() == SnowflakeIdentifier.Type.TABLE.
+   *
+   * @param scope The scope in which to list, which may be ROOT, a DATABASE, 
or a SCHEMA.
+   */
+  List<SnowflakeIdentifier> listIcebergTables(SnowflakeIdentifier scope);
+
+  /**
+   * Returns Snowflake-level metadata containing locations to more detailed 
metadata.
+   *
+   * @param tableIdentifier The fully-qualified identifier that must be of type
+   *     SnowflakeIdentifier.Type.TABLE.
+   */
+  SnowflakeTableMetadata loadTableMetadata(SnowflakeIdentifier 
tableIdentifier);
+}
diff --git 
a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeIdentifier.java 
b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeIdentifier.java
new file mode 100644
index 0000000000..3082b1d8e5
--- /dev/null
+++ 
b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeIdentifier.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * Since the SnowflakeCatalog supports exactly two levels of Iceberg 
Namespaces, corresponding
+ * directly to the "database" and "schema" portions of Snowflake's resource 
model, this class
+ * represents a pre-validated and structured representation of a 
fully-qualified Snowflake resource
+ * identifier. Snowflake-specific helper libraries should operate on this 
representation instead of
+ * directly operating on TableIdentifiers or Namespaces wherever possible to 
avoid duplication of
+ * parsing/validation logic for Iceberg TableIdentifier/Namespace levels.
+ */
+class SnowflakeIdentifier {
+  public enum Type {
+    ROOT,
+    DATABASE,
+    SCHEMA,
+    TABLE
+  }
+
+  private final String databaseName;
+  private final String schemaName;
+  private final String tableName;
+  private final Type type;
+
+  private SnowflakeIdentifier(String databaseName, String schemaName, String 
tableName, Type type) {
+    this.databaseName = databaseName;
+    this.schemaName = schemaName;
+    this.tableName = tableName;
+    this.type = type;
+  }
+
+  public static SnowflakeIdentifier ofRoot() {
+    return new SnowflakeIdentifier(null, null, null, Type.ROOT);
+  }
+
+  public static SnowflakeIdentifier ofDatabase(String databaseName) {
+    Preconditions.checkArgument(null != databaseName, "databaseName must be 
non-null");
+    return new SnowflakeIdentifier(databaseName, null, null, Type.DATABASE);
+  }
+
+  public static SnowflakeIdentifier ofSchema(String databaseName, String 
schemaName) {
+    Preconditions.checkArgument(null != databaseName, "databaseName must be 
non-null");
+    Preconditions.checkArgument(null != schemaName, "schemaName must be 
non-null");
+    return new SnowflakeIdentifier(databaseName, schemaName, null, 
Type.SCHEMA);
+  }
+
+  public static SnowflakeIdentifier ofTable(
+      String databaseName, String schemaName, String tableName) {
+    Preconditions.checkArgument(null != databaseName, "databaseName must be 
non-null");
+    Preconditions.checkArgument(null != schemaName, "schemaName must be 
non-null");
+    Preconditions.checkArgument(null != tableName, "tableName must be 
non-null");
+    return new SnowflakeIdentifier(databaseName, schemaName, tableName, 
Type.TABLE);
+  }
+
+  /**
+   * If type is TABLE, expect non-null databaseName, schemaName, and 
tableName. If type is SCHEMA,
+   * expect non-null databaseName and schemaName. If type is DATABASE, expect 
non-null databaseName.
+   * If type is ROOT, expect all of databaseName, schemaName, and tableName to 
be null.
+   */
+  public Type type() {
+    return type;
+  }
+
+  public String tableName() {
+    return tableName;
+  }
+
+  public String databaseName() {
+    return databaseName;
+  }
+
+  public String schemaName() {
+    return schemaName;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    } else if (!(o instanceof SnowflakeIdentifier)) {
+      return false;
+    }
+
+    SnowflakeIdentifier that = (SnowflakeIdentifier) o;
+    return Objects.equal(this.databaseName, that.databaseName)
+        && Objects.equal(this.schemaName, that.schemaName)
+        && Objects.equal(this.tableName, that.tableName);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(databaseName, schemaName, tableName);
+  }
+
+  /** Returns this identifier as a String suitable for use in a Snowflake 
IDENTIFIER param. */
+  public String toIdentifierString() {
+    switch (type()) {
+      case TABLE:
+        return String.format("%s.%s.%s", databaseName, schemaName, tableName);
+      case SCHEMA:
+        return String.format("%s.%s", databaseName, schemaName);
+      case DATABASE:
+        return databaseName;
+      default:
+        return "";
+    }
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s: '%s'", type(), toIdentifierString());
+  }
+}
diff --git 
a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableMetadata.java
 
b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableMetadata.java
new file mode 100644
index 0000000000..c550b3e13a
--- /dev/null
+++ 
b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableMetadata.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+class SnowflakeTableMetadata {
+  public static final Pattern SNOWFLAKE_AZURE_PATTERN =
+      Pattern.compile("azure://([^/]+)/([^/]+)/(.*)");
+
+  private final String snowflakeMetadataLocation;
+  private final String icebergMetadataLocation;
+  private final String status;
+
+  // Note: Since not all sources will necessarily come from a raw JSON 
representation, this raw
+  // JSON should only be considered a convenient debugging field. Equality of 
two
+  // SnowflakeTableMetadata instances should not depend on equality of this 
field.
+  private final String rawJsonVal;
+
+  SnowflakeTableMetadata(
+      String snowflakeMetadataLocation,
+      String icebergMetadataLocation,
+      String status,
+      String rawJsonVal) {
+    this.snowflakeMetadataLocation = snowflakeMetadataLocation;
+    this.icebergMetadataLocation = icebergMetadataLocation;
+    this.status = status;
+    this.rawJsonVal = rawJsonVal;
+  }
+
+  /** Storage location of table metadata in Snowflake's path syntax. */
+  public String snowflakeMetadataLocation() {
+    return snowflakeMetadataLocation;
+  }
+
+  /** Storage location of table metadata in Iceberg's path syntax. */
+  public String icebergMetadataLocation() {
+    return icebergMetadataLocation;
+  }
+
+  public String getStatus() {
+    return status;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    } else if (!(o instanceof SnowflakeTableMetadata)) {
+      return false;
+    }
+
+    // Only consider parsed fields, not the raw JSON that may or may not be 
the original source of
+    // this instance.
+    SnowflakeTableMetadata that = (SnowflakeTableMetadata) o;
+    return Objects.equal(this.snowflakeMetadataLocation, 
that.snowflakeMetadataLocation)
+        && Objects.equal(this.icebergMetadataLocation, 
that.icebergMetadataLocation)
+        && Objects.equal(this.status, that.status);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(snowflakeMetadataLocation, 
icebergMetadataLocation, status);
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "snowflakeMetadataLocation: '%s', icebergMetadataLocation: '%s', 
status: '%s'",
+        snowflakeMetadataLocation, icebergMetadataLocation, status);
+  }
+
+  public String toDebugString() {
+    return String.format("%s, rawJsonVal: %s", toString(), rawJsonVal);
+  }
+
+  /**
+   * Translates from Snowflake's path syntax to Iceberg's path syntax for 
paths matching known
+   * non-compatible Snowflake paths. Throws IllegalArgumentException if the 
prefix of the
+   * snowflakeLocation is a known non-compatible path syntax but fails to 
match the expected path
+   * components for a successful translation.
+   */
+  public static String snowflakeLocationToIcebergLocation(String 
snowflakeLocation) {
+    if (snowflakeLocation.startsWith("azure://")) {
+      // Convert from expected path of the form:
+      // azure://account.blob.core.windows.net/container/volumepath
+      // to:
+      // wasbs://[email protected]/volumepath
+      Matcher matcher = SNOWFLAKE_AZURE_PATTERN.matcher(snowflakeLocation);
+      Preconditions.checkArgument(
+          matcher.matches(),
+          "Location '%s' failed to match pattern '%s'",
+          snowflakeLocation,
+          SNOWFLAKE_AZURE_PATTERN);
+      return String.format(
+          "wasbs://%s@%s/%s", matcher.group(2), matcher.group(1), 
matcher.group(3));
+    } else if (snowflakeLocation.startsWith("gcs://")) {
+      // Convert from expected path of the form:
+      // gcs://bucket/path
+      // to:
+      // gs://bucket/path
+      return "gs" + snowflakeLocation.substring(3);
+    }
+
+    return snowflakeLocation;
+  }
+
+  /**
+   * Factory method for parsing a JSON string containing expected Snowflake 
table metadata into a
+   * SnowflakeTableMetadata object.
+   */
+  public static SnowflakeTableMetadata parseJson(String json) {
+    JsonNode parsedVal;
+    try {
+      parsedVal = JsonUtil.mapper().readValue(json, JsonNode.class);
+    } catch (IOException ioe) {
+      throw new IllegalArgumentException(String.format("Malformed JSON: %s", 
json), ioe);
+    }
+
+    String snowflakeMetadataLocation = JsonUtil.getString("metadataLocation", 
parsedVal);
+    String status = JsonUtil.getStringOrNull("status", parsedVal);
+
+    String icebergMetadataLocation = 
snowflakeLocationToIcebergLocation(snowflakeMetadataLocation);
+
+    return new SnowflakeTableMetadata(
+        snowflakeMetadataLocation, icebergMetadataLocation, status, json);
+  }
+}
diff --git 
a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableOperations.java
 
b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableOperations.java
new file mode 100644
index 0000000000..1fe90d7eff
--- /dev/null
+++ 
b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableOperations.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SnowflakeTableOperations extends BaseMetastoreTableOperations {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SnowflakeTableOperations.class);
+
+  private final FileIO fileIO;
+  private final TableIdentifier tableIdentifier;
+  private final SnowflakeIdentifier snowflakeIdentifierForTable;
+  private final String fullTableName;
+
+  private final SnowflakeClient snowflakeClient;
+
+  protected SnowflakeTableOperations(
+      SnowflakeClient snowflakeClient,
+      FileIO fileIO,
+      String catalogName,
+      TableIdentifier tableIdentifier) {
+    this.snowflakeClient = snowflakeClient;
+    this.fileIO = fileIO;
+    this.tableIdentifier = tableIdentifier;
+    this.snowflakeIdentifierForTable = 
NamespaceHelpers.toSnowflakeIdentifier(tableIdentifier);
+    this.fullTableName = String.format("%s.%s", catalogName, tableIdentifier);
+  }
+
+  @Override
+  public void doRefresh() {
+    LOG.debug("Getting metadata location for table {}", tableIdentifier);
+    String location = loadTableMetadataLocation();
+    Preconditions.checkState(
+        location != null && !location.isEmpty(),
+        "Got null or empty location %s for table %s",
+        location,
+        tableIdentifier);
+    refreshFromMetadataLocation(location);
+  }
+
+  @Override
+  public FileIO io() {
+    return fileIO;
+  }
+
+  @Override
+  protected String tableName() {
+    return fullTableName;
+  }
+
+  @VisibleForTesting
+  String fullTableName() {
+    return tableName();
+  }
+
+  private String loadTableMetadataLocation() {
+    SnowflakeTableMetadata metadata =
+        snowflakeClient.loadTableMetadata(snowflakeIdentifierForTable);
+
+    if (metadata == null) {
+      throw new NoSuchTableException("Cannot find table %s", 
snowflakeIdentifierForTable);
+    }
+
+    if (!metadata.getStatus().equals("success")) {
+      LOG.warn(
+          "Got non-successful table metadata: {} with metadataLocation {} for 
table {}",
+          metadata.getStatus(),
+          metadata.icebergMetadataLocation(),
+          snowflakeIdentifierForTable);
+    }
+
+    return metadata.icebergMetadataLocation();
+  }
+}
diff --git 
a/snowflake/src/test/java/org/apache/iceberg/snowflake/FakeSnowflakeClient.java 
b/snowflake/src/test/java/org/apache/iceberg/snowflake/FakeSnowflakeClient.java
new file mode 100644
index 0000000000..834dc3c6c4
--- /dev/null
+++ 
b/snowflake/src/test/java/org/apache/iceberg/snowflake/FakeSnowflakeClient.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.jdbc.UncheckedSQLException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class FakeSnowflakeClient implements SnowflakeClient {
+  // In-memory lookup by database/schema/tableName to table metadata.
+  private final Map<String, Map<String, Map<String, SnowflakeTableMetadata>>> 
databases =
+      Maps.newTreeMap();
+  private boolean closed = false;
+
+  public FakeSnowflakeClient() {}
+
+  /**
+   * Also adds parent database/schema if they don't already exist. If the 
tableName already exists
+   * under the given database/schema, the value is replaced with the provided 
metadata.
+   */
+  public void addTable(SnowflakeIdentifier tableIdentifier, 
SnowflakeTableMetadata metadata) {
+    Preconditions.checkState(!closed, "Cannot call addTable after calling 
close()");
+    if (!databases.containsKey(tableIdentifier.databaseName())) {
+      databases.put(tableIdentifier.databaseName(), Maps.newTreeMap());
+    }
+    Map<String, Map<String, SnowflakeTableMetadata>> schemas =
+        databases.get(tableIdentifier.databaseName());
+    if (!schemas.containsKey(tableIdentifier.schemaName())) {
+      schemas.put(tableIdentifier.schemaName(), Maps.newTreeMap());
+    }
+    Map<String, SnowflakeTableMetadata> tables = 
schemas.get(tableIdentifier.schemaName());
+    tables.put(tableIdentifier.tableName(), metadata);
+  }
+
+  @Override
+  public boolean databaseExists(SnowflakeIdentifier database) {
+    return databases.containsKey(database.databaseName());
+  }
+
+  @Override
+  public boolean schemaExists(SnowflakeIdentifier schema) {
+    return databases.containsKey(schema.databaseName())
+        && 
databases.get(schema.databaseName()).containsKey(schema.schemaName());
+  }
+
+  @Override
+  public List<SnowflakeIdentifier> listDatabases() {
+    Preconditions.checkState(!closed, "Cannot call listDatabases after calling 
close()");
+    List<SnowflakeIdentifier> databaseIdentifiers = Lists.newArrayList();
+    for (String databaseName : databases.keySet()) {
+      databaseIdentifiers.add(SnowflakeIdentifier.ofDatabase(databaseName));
+    }
+    return databaseIdentifiers;
+  }
+
+  @Override
+  public List<SnowflakeIdentifier> listSchemas(SnowflakeIdentifier scope) {
+    Preconditions.checkState(!closed, "Cannot call listSchemas after calling 
close()");
+    List<SnowflakeIdentifier> schemas = Lists.newArrayList();
+    switch (scope.type()) {
+      case ROOT:
+        // "account-level" listing.
+        for (Map.Entry<String, Map<String, Map<String, 
SnowflakeTableMetadata>>> db :
+            databases.entrySet()) {
+          for (String schema : db.getValue().keySet()) {
+            schemas.add(SnowflakeIdentifier.ofSchema(db.getKey(), schema));
+          }
+        }
+        break;
+      case DATABASE:
+        String dbName = scope.databaseName();
+        if (databases.containsKey(dbName)) {
+          for (String schema : databases.get(dbName).keySet()) {
+            schemas.add(SnowflakeIdentifier.ofSchema(dbName, schema));
+          }
+        } else {
+          throw new UncheckedSQLException("Object does not exist: database: 
'%s'", dbName);
+        }
+        break;
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unsupported scope type for listSchemas: '%s'", 
scope));
+    }
+    return schemas;
+  }
+
+  @Override
+  public List<SnowflakeIdentifier> listIcebergTables(SnowflakeIdentifier 
scope) {
+    Preconditions.checkState(!closed, "Cannot call listIcebergTables after 
calling close()");
+    List<SnowflakeIdentifier> tables = Lists.newArrayList();
+    switch (scope.type()) {
+      case ROOT:
+        {
+          // "account-level" listing.
+          for (Map.Entry<String, Map<String, Map<String, 
SnowflakeTableMetadata>>> db :
+              databases.entrySet()) {
+            for (Map.Entry<String, Map<String, SnowflakeTableMetadata>> schema 
:
+                db.getValue().entrySet()) {
+              for (String tableName : schema.getValue().keySet()) {
+                tables.add(SnowflakeIdentifier.ofTable(db.getKey(), 
schema.getKey(), tableName));
+              }
+            }
+          }
+          break;
+        }
+      case DATABASE:
+        {
+          String dbName = scope.databaseName();
+          if (databases.containsKey(dbName)) {
+            for (Map.Entry<String, Map<String, SnowflakeTableMetadata>> schema 
:
+                databases.get(dbName).entrySet()) {
+              for (String tableName : schema.getValue().keySet()) {
+                tables.add(SnowflakeIdentifier.ofTable(dbName, 
schema.getKey(), tableName));
+              }
+            }
+          } else {
+            throw new UncheckedSQLException("Object does not exist: database: 
'%s'", dbName);
+          }
+          break;
+        }
+      case SCHEMA:
+        {
+          String dbName = scope.databaseName();
+          if (databases.containsKey(dbName)) {
+            String schemaName = scope.schemaName();
+            if (databases.get(dbName).containsKey(schemaName)) {
+              for (String tableName : 
databases.get(dbName).get(schemaName).keySet()) {
+                tables.add(SnowflakeIdentifier.ofTable(dbName, schemaName, 
tableName));
+              }
+            } else {
+              throw new UncheckedSQLException(
+                  "Object does not exist: database.schema: '%s.%s'", dbName, 
schemaName);
+            }
+          } else {
+            throw new UncheckedSQLException("Object does not exist: database: 
'%s'", dbName);
+          }
+          break;
+        }
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unsupported scope type for listing tables: %s", 
scope));
+    }
+    return tables;
+  }
+
+  @Override
+  public SnowflakeTableMetadata loadTableMetadata(SnowflakeIdentifier 
tableIdentifier) {
+    Preconditions.checkState(!closed, "Cannot call getTableMetadata after 
calling close()");
+
+    Preconditions.checkArgument(
+        tableIdentifier.type() == SnowflakeIdentifier.Type.TABLE,
+        "tableIdentifier must be type TABLE, get: %s",
+        tableIdentifier);
+    String dbName = tableIdentifier.databaseName();
+    String schemaName = tableIdentifier.schemaName();
+    if (!databases.containsKey(dbName)
+        || !databases.get(dbName).containsKey(schemaName)
+        || 
!databases.get(dbName).get(schemaName).containsKey(tableIdentifier.tableName()))
 {
+      throw new UncheckedSQLException("Object does not exist: object: '%s'", 
tableIdentifier);
+    }
+    return 
databases.get(dbName).get(schemaName).get(tableIdentifier.tableName());
+  }
+
+  public boolean isClosed() {
+    return closed;
+  }
+
+  @Override
+  public void close() {
+    closed = true;
+  }
+}
diff --git 
a/snowflake/src/test/java/org/apache/iceberg/snowflake/JdbcSnowflakeClientTest.java
 
b/snowflake/src/test/java/org/apache/iceberg/snowflake/JdbcSnowflakeClientTest.java
new file mode 100644
index 0000000000..1374ad8ac2
--- /dev/null
+++ 
b/snowflake/src/test/java/org/apache/iceberg/snowflake/JdbcSnowflakeClientTest.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.jdbc.JdbcClientPool;
+import org.apache.iceberg.jdbc.UncheckedInterruptedException;
+import org.apache.iceberg.jdbc.UncheckedSQLException;
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class JdbcSnowflakeClientTest {
+  @Mock private Connection mockConnection;
+  @Mock private JdbcClientPool mockClientPool;
+  @Mock private JdbcSnowflakeClient.QueryHarness mockQueryHarness;
+  @Mock private ResultSet mockResultSet;
+
+  private JdbcSnowflakeClient snowflakeClient;
+
+  @Before
+  public void before() throws SQLException, InterruptedException {
+    snowflakeClient = new JdbcSnowflakeClient(mockClientPool);
+    snowflakeClient.setQueryHarness(mockQueryHarness);
+
+    doAnswer(invocation -> ((ClientPool.Action) 
invocation.getArguments()[0]).run(mockConnection))
+        .when(mockClientPool)
+        .run(any(ClientPool.Action.class));
+    doAnswer(
+            invocation ->
+                ((JdbcSnowflakeClient.ResultSetParser) 
invocation.getArguments()[2])
+                    .parse(mockResultSet))
+        .when(mockQueryHarness)
+        .query(
+            any(Connection.class),
+            any(String.class),
+            any(JdbcSnowflakeClient.ResultSetParser.class),
+            ArgumentMatchers.<String>any());
+  }
+
+  @Test
+  public void testNullClientPoolInConstructor() {
+    Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(() -> new JdbcSnowflakeClient(null))
+        .withMessageContaining("JdbcClientPool must be non-null");
+  }
+
+  @Test
+  public void testDatabaseExists() throws SQLException {
+    when(mockResultSet.next()).thenReturn(true).thenReturn(false);
+    when(mockResultSet.getString("name")).thenReturn("DB_1");
+
+    
Assertions.assertThat(snowflakeClient.databaseExists(SnowflakeIdentifier.ofDatabase("DB_1")))
+        .isTrue();
+
+    verify(mockQueryHarness)
+        .query(
+            eq(mockConnection),
+            eq("SHOW DATABASES LIKE 'DB_1' IN ACCOUNT"),
+            any(JdbcSnowflakeClient.ResultSetParser.class));
+  }
+
+  @Test
+  public void testDatabaseExistsSpecialCharacters() throws SQLException {
+    when(mockResultSet.next()).thenReturn(true).thenReturn(false);
+    when(mockResultSet.getString("name")).thenReturn("$DB_1$.'!@#%^&*");
+
+    Assertions.assertThat(
+            
snowflakeClient.databaseExists(SnowflakeIdentifier.ofDatabase("$DB_1$.'!@#%^&*")))
+        .isTrue();
+
+    verify(mockQueryHarness)
+        .query(
+            eq(mockConnection),
+            eq("SHOW DATABASES LIKE '_DB_1$_________' IN ACCOUNT"),
+            any(JdbcSnowflakeClient.ResultSetParser.class));
+  }
+
+  @Test
+  public void testDatabaseDoesntExistNoResults() throws SQLException {
+    when(mockResultSet.next()).thenReturn(false);
+
+    
Assertions.assertThat(snowflakeClient.databaseExists(SnowflakeIdentifier.ofDatabase("DB_1")))
+        .isFalse();
+  }
+
+  @Test
+  public void testDatabaseDoesntExistMismatchedResults() throws SQLException {
+    when(mockResultSet.next()).thenReturn(true).thenReturn(false);
+    when(mockResultSet.getString("name")).thenReturn("DBZ1");
+
+    
Assertions.assertThat(snowflakeClient.databaseExists(SnowflakeIdentifier.ofDatabase("DB_1")))
+        .isFalse();
+  }
+
+  @Test
+  public void testSchemaExists() throws SQLException {
+    when(mockResultSet.next())
+        .thenReturn(true)
+        .thenReturn(false)
+        .thenReturn(true)
+        .thenReturn(false);
+    
when(mockResultSet.getString("name")).thenReturn("DB_1").thenReturn("SCHEMA_1");
+    when(mockResultSet.getString("database_name")).thenReturn("DB_1");
+
+    Assertions.assertThat(
+            snowflakeClient.schemaExists(SnowflakeIdentifier.ofSchema("DB_1", 
"SCHEMA_1")))
+        .isTrue();
+
+    verify(mockQueryHarness)
+        .query(
+            eq(mockConnection),
+            eq("SHOW DATABASES LIKE 'DB_1' IN ACCOUNT"),
+            any(JdbcSnowflakeClient.ResultSetParser.class));
+    verify(mockQueryHarness)
+        .query(
+            eq(mockConnection),
+            eq("SHOW SCHEMAS LIKE 'SCHEMA_1' IN DATABASE IDENTIFIER(?)"),
+            any(JdbcSnowflakeClient.ResultSetParser.class),
+            eq("DB_1"));
+  }
+
+  @Test
+  public void testSchemaExistsSpecialCharacters() throws SQLException {
+    when(mockResultSet.next())
+        .thenReturn(true)
+        .thenReturn(false)
+        .thenReturn(true)
+        .thenReturn(false);
+    
when(mockResultSet.getString("name")).thenReturn("DB_1").thenReturn("$SCHEMA_1$.'!@#%^&*");
+    when(mockResultSet.getString("database_name")).thenReturn("DB_1");
+
+    Assertions.assertThat(
+            snowflakeClient.schemaExists(
+                SnowflakeIdentifier.ofSchema("DB_1", "$SCHEMA_1$.'!@#%^&*")))
+        .isTrue();
+
+    verify(mockQueryHarness)
+        .query(
+            eq(mockConnection),
+            eq("SHOW DATABASES LIKE 'DB_1' IN ACCOUNT"),
+            any(JdbcSnowflakeClient.ResultSetParser.class));
+    verify(mockQueryHarness)
+        .query(
+            eq(mockConnection),
+            eq("SHOW SCHEMAS LIKE '_SCHEMA_1$_________' IN DATABASE 
IDENTIFIER(?)"),
+            any(JdbcSnowflakeClient.ResultSetParser.class),
+            eq("DB_1"));
+  }
+
+  @Test
+  public void testSchemaDoesntExistMismatchDatabase() throws SQLException {
+    when(mockResultSet.next()).thenReturn(true).thenReturn(false);
+    when(mockResultSet.getString("name")).thenReturn("DBZ1");
+
+    Assertions.assertThat(
+            snowflakeClient.schemaExists(SnowflakeIdentifier.ofSchema("DB_1", 
"SCHEMA_1")))
+        .isFalse();
+  }
+
+  @Test
+  public void testSchemaDoesntExistNoSchemaFound() throws SQLException {
+    
when(mockResultSet.next()).thenReturn(true).thenReturn(false).thenReturn(false);
+    when(mockResultSet.getString("name")).thenReturn("DB_1");
+
+    Assertions.assertThat(
+            snowflakeClient.schemaExists(SnowflakeIdentifier.ofSchema("DB_1", 
"SCHEMA_1")))
+        .isFalse();
+  }
+
+  @Test
+  public void testSchemaDoesntExistSchemaMismatch() throws SQLException {
+    when(mockResultSet.next())
+        .thenReturn(true)
+        .thenReturn(false)
+        .thenReturn(true)
+        .thenReturn(false);
+    
when(mockResultSet.getString("name")).thenReturn("DB_1").thenReturn("SCHEMAZ1");
+    when(mockResultSet.getString("database_name")).thenReturn("DB_1");
+
+    Assertions.assertThat(
+            snowflakeClient.schemaExists(SnowflakeIdentifier.ofSchema("DB_1", 
"SCHEMA_1")))
+        .isFalse();
+  }
+
+  @Test
+  public void testListDatabasesInAccount() throws SQLException {
+    
when(mockResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false);
+    
when(mockResultSet.getString("name")).thenReturn("DB_1").thenReturn("DB_2").thenReturn("DB_3");
+
+    List<SnowflakeIdentifier> actualList = snowflakeClient.listDatabases();
+
+    verify(mockQueryHarness)
+        .query(
+            eq(mockConnection),
+            eq("SHOW DATABASES IN ACCOUNT"),
+            any(JdbcSnowflakeClient.ResultSetParser.class));
+
+    Assertions.assertThat(actualList)
+        .containsExactly(
+            SnowflakeIdentifier.ofDatabase("DB_1"),
+            SnowflakeIdentifier.ofDatabase("DB_2"),
+            SnowflakeIdentifier.ofDatabase("DB_3"));
+  }
+
+  /**
+   * For the root scope, expect an underlying query to list schemas at the 
ACCOUNT level with no
+   * query parameters.
+   */
+  @Test
+  public void testListSchemasInAccount() throws SQLException {
+    
when(mockResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false);
+    when(mockResultSet.getString("database_name"))
+        .thenReturn("DB_1")
+        .thenReturn("DB_1")
+        .thenReturn("DB_2");
+    when(mockResultSet.getString("name"))
+        .thenReturn("SCHEMA_1")
+        .thenReturn("SCHEMA_2")
+        .thenReturn("SCHEMA_3");
+
+    List<SnowflakeIdentifier> actualList =
+        snowflakeClient.listSchemas(SnowflakeIdentifier.ofRoot());
+
+    verify(mockQueryHarness)
+        .query(
+            eq(mockConnection),
+            eq("SHOW SCHEMAS IN ACCOUNT"),
+            any(JdbcSnowflakeClient.ResultSetParser.class),
+            eq(null));
+
+    Assertions.assertThat(actualList)
+        .containsExactly(
+            SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_1"),
+            SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_2"),
+            SnowflakeIdentifier.ofSchema("DB_2", "SCHEMA_3"));
+  }
+
+  /**
+   * For a DATABASE scope, expect an underlying query to list schemas at the 
DATABASE level and
+   * supply the database as a query param in an IDENTIFIER.
+   */
+  @Test
+  public void testListSchemasInDatabase() throws SQLException {
+    
when(mockResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+    
when(mockResultSet.getString("database_name")).thenReturn("DB_1").thenReturn("DB_1");
+    
when(mockResultSet.getString("name")).thenReturn("SCHEMA_1").thenReturn("SCHEMA_2");
+
+    List<SnowflakeIdentifier> actualList =
+        snowflakeClient.listSchemas(SnowflakeIdentifier.ofDatabase("DB_1"));
+
+    verify(mockQueryHarness)
+        .query(
+            eq(mockConnection),
+            eq("SHOW SCHEMAS IN DATABASE IDENTIFIER(?)"),
+            any(JdbcSnowflakeClient.ResultSetParser.class),
+            eq("DB_1"));
+
+    Assertions.assertThat(actualList)
+        .containsExactly(
+            SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_1"),
+            SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_2"));
+  }
+
+  /**
+   * Any unexpected SQLException from the underlying connection will propagate 
out as an
+   * UncheckedSQLException when listing schemas.
+   */
+  @Test
+  public void testListSchemasSQLException() throws SQLException, 
InterruptedException {
+    when(mockClientPool.run(any(ClientPool.Action.class)))
+        .thenThrow(new SQLException("Fake SQL exception"));
+    Assertions.assertThatExceptionOfType(UncheckedSQLException.class)
+        .isThrownBy(() -> 
snowflakeClient.listSchemas(SnowflakeIdentifier.ofDatabase("DB_1")))
+        .withStackTraceContaining("Fake SQL exception");
+  }
+
+  /**
+   * Any unexpected InterruptedException from the underlying connection will 
propagate out as an
+   * UncheckedInterruptedException when listing schemas.
+   */
+  @Test
+  public void testListSchemasInterruptedException() throws SQLException, 
InterruptedException {
+    when(mockClientPool.run(any(ClientPool.Action.class)))
+        .thenThrow(new InterruptedException("Fake interrupted exception"));
+    Assertions.assertThatExceptionOfType(UncheckedInterruptedException.class)
+        .isThrownBy(() -> 
snowflakeClient.listSchemas(SnowflakeIdentifier.ofDatabase("DB_1")))
+        .withStackTraceContaining("Fake interrupted exception");
+  }
+
+  /**
+   * For the root/empty scope, expect an underlying query to list tables at 
the ACCOUNT level with
+   * no query parameters.
+   */
+  @Test
+  public void testListIcebergTablesInAccount() throws SQLException {
+    when(mockResultSet.next())
+        .thenReturn(true)
+        .thenReturn(true)
+        .thenReturn(true)
+        .thenReturn(true)
+        .thenReturn(false);
+    when(mockResultSet.getString("database_name"))
+        .thenReturn("DB_1")
+        .thenReturn("DB_1")
+        .thenReturn("DB_1")
+        .thenReturn("DB_2");
+    when(mockResultSet.getString("schema_name"))
+        .thenReturn("SCHEMA_1")
+        .thenReturn("SCHEMA_1")
+        .thenReturn("SCHEMA_2")
+        .thenReturn("SCHEMA_3");
+    when(mockResultSet.getString("name"))
+        .thenReturn("TABLE_1")
+        .thenReturn("TABLE_2")
+        .thenReturn("TABLE_3")
+        .thenReturn("TABLE_4");
+
+    List<SnowflakeIdentifier> actualList =
+        snowflakeClient.listIcebergTables(SnowflakeIdentifier.ofRoot());
+
+    verify(mockQueryHarness)
+        .query(
+            eq(mockConnection),
+            eq("SHOW ICEBERG TABLES IN ACCOUNT"),
+            any(JdbcSnowflakeClient.ResultSetParser.class),
+            eq(null));
+
+    Assertions.assertThat(actualList)
+        .containsExactly(
+            SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1"),
+            SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_2"),
+            SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_2", "TABLE_3"),
+            SnowflakeIdentifier.ofTable("DB_2", "SCHEMA_3", "TABLE_4"));
+  }
+
+  /**
+   * For a DATABASE scope, expect an underlying query to list tables at the 
DATABASE level and
+   * supply the database as a query param in an IDENTIFIER.
+   */
+  @Test
+  public void testListIcebergTablesInDatabase() throws SQLException {
+    
when(mockResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false);
+    when(mockResultSet.getString("database_name"))
+        .thenReturn("DB_1")
+        .thenReturn("DB_1")
+        .thenReturn("DB_1");
+    when(mockResultSet.getString("schema_name"))
+        .thenReturn("SCHEMA_1")
+        .thenReturn("SCHEMA_1")
+        .thenReturn("SCHEMA_2");
+    when(mockResultSet.getString("name"))
+        .thenReturn("TABLE_1")
+        .thenReturn("TABLE_2")
+        .thenReturn("TABLE_3");
+
+    List<SnowflakeIdentifier> actualList =
+        
snowflakeClient.listIcebergTables(SnowflakeIdentifier.ofDatabase("DB_1"));
+
+    verify(mockQueryHarness)
+        .query(
+            eq(mockConnection),
+            eq("SHOW ICEBERG TABLES IN DATABASE IDENTIFIER(?)"),
+            any(JdbcSnowflakeClient.ResultSetParser.class),
+            eq("DB_1"));
+
+    Assertions.assertThat(actualList)
+        .containsExactly(
+            SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1"),
+            SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_2"),
+            SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_2", "TABLE_3"));
+  }
+
+  /**
+   * For a SCHEMA scope, expect an underlying query to list tables at the 
SCHEMA level and supply
+   * the schema as a query param in an IDENTIFIER.
+   */
+  @Test
+  public void testListIcebergTablesInSchema() throws SQLException {
+    
when(mockResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+    
when(mockResultSet.getString("database_name")).thenReturn("DB_1").thenReturn("DB_1");
+    
when(mockResultSet.getString("schema_name")).thenReturn("SCHEMA_1").thenReturn("SCHEMA_1");
+    
when(mockResultSet.getString("name")).thenReturn("TABLE_1").thenReturn("TABLE_2");
+
+    List<SnowflakeIdentifier> actualList =
+        snowflakeClient.listIcebergTables(SnowflakeIdentifier.ofSchema("DB_1", 
"SCHEMA_1"));
+
+    verify(mockQueryHarness)
+        .query(
+            eq(mockConnection),
+            eq("SHOW ICEBERG TABLES IN SCHEMA IDENTIFIER(?)"),
+            any(JdbcSnowflakeClient.ResultSetParser.class),
+            eq("DB_1.SCHEMA_1"));
+
+    Assertions.assertThat(actualList)
+        .containsExactly(
+            SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1"),
+            SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_2"));
+  }
+
+  /**
+   * Any unexpected SQLException from the underlying connection will propagate 
out as an
+   * UncheckedSQLException when listing tables.
+   */
+  @Test
+  public void testListIcebergTablesSQLException() throws SQLException, 
InterruptedException {
+    when(mockClientPool.run(any(ClientPool.Action.class)))
+        .thenThrow(new SQLException("Fake SQL exception"));
+    Assertions.assertThatExceptionOfType(UncheckedSQLException.class)
+        .isThrownBy(() -> 
snowflakeClient.listIcebergTables(SnowflakeIdentifier.ofDatabase("DB_1")))
+        .withStackTraceContaining("Fake SQL exception");
+  }
+
+  /**
+   * Any unexpected InterruptedException from the underlying connection will 
propagate out as an
+   * UncheckedInterruptedException when listing tables.
+   */
+  @Test
+  public void testListIcebergTablesInterruptedException()
+      throws SQLException, InterruptedException {
+    when(mockClientPool.run(any(ClientPool.Action.class)))
+        .thenThrow(new InterruptedException("Fake interrupted exception"));
+    Assertions.assertThatExceptionOfType(UncheckedInterruptedException.class)
+        .isThrownBy(() -> 
snowflakeClient.listIcebergTables(SnowflakeIdentifier.ofDatabase("DB_1")))
+        .withStackTraceContaining("Fake interrupted exception");
+  }
+
+  /**
+   * Test parsing of table metadata JSON from a GET_ICEBERG_TABLE_INFORMATION 
call, with the S3 path
+   * unaltered between snowflake/iceberg path representations.
+   */
+  @Test
+  public void testGetS3TableMetadata() throws SQLException {
+    when(mockResultSet.next()).thenReturn(true);
+    when(mockResultSet.getString("METADATA"))
+        .thenReturn(
+            
"{\"metadataLocation\":\"s3://tab1/metadata/v3.metadata.json\",\"status\":\"success\"}");
+
+    SnowflakeTableMetadata actualMetadata =
+        snowflakeClient.loadTableMetadata(
+            SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1"));
+
+    verify(mockQueryHarness)
+        .query(
+            eq(mockConnection),
+            eq("SELECT SYSTEM$GET_ICEBERG_TABLE_INFORMATION(?) AS METADATA"),
+            any(JdbcSnowflakeClient.ResultSetParser.class),
+            eq("DB_1.SCHEMA_1.TABLE_1"));
+
+    SnowflakeTableMetadata expectedMetadata =
+        new SnowflakeTableMetadata(
+            "s3://tab1/metadata/v3.metadata.json",
+            "s3://tab1/metadata/v3.metadata.json",
+            "success",
+            null);
+    Assertions.assertThat(actualMetadata).isEqualTo(expectedMetadata);
+  }
+
+  /**
+   * Test parsing of table metadata JSON from a GET_ICEBERG_TABLE_INFORMATION 
call, with the Azure
+   * path translated from an azure:// format to a wasbs:// format.
+   */
+  @Test
+  public void testGetAzureTableMetadata() throws SQLException {
+    when(mockResultSet.next()).thenReturn(true);
+    when(mockResultSet.getString("METADATA"))
+        .thenReturn(
+            
"{\"metadataLocation\":\"azure://myaccount.blob.core.windows.net/mycontainer/tab3/metadata/v334.metadata.json\",\"status\":\"success\"}");
+
+    SnowflakeTableMetadata actualMetadata =
+        snowflakeClient.loadTableMetadata(
+            SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1"));
+
+    verify(mockQueryHarness)
+        .query(
+            eq(mockConnection),
+            eq("SELECT SYSTEM$GET_ICEBERG_TABLE_INFORMATION(?) AS METADATA"),
+            any(JdbcSnowflakeClient.ResultSetParser.class),
+            eq("DB_1.SCHEMA_1.TABLE_1"));
+
+    SnowflakeTableMetadata expectedMetadata =
+        new SnowflakeTableMetadata(
+            
"azure://myaccount.blob.core.windows.net/mycontainer/tab3/metadata/v334.metadata.json",
+            
"wasbs://[email protected]/tab3/metadata/v334.metadata.json",
+            "success",
+            null);
+    Assertions.assertThat(actualMetadata).isEqualTo(expectedMetadata);
+  }
+
+  /**
+   * Test parsing of table metadata JSON from a GET_ICEBERG_TABLE_INFORMATION 
call, with the GCS
+   * path translated from a gcs:// format to a gs:// format.
+   */
+  @Test
+  public void testGetGcsTableMetadata() throws SQLException {
+    when(mockResultSet.next()).thenReturn(true);
+    when(mockResultSet.getString("METADATA"))
+        .thenReturn(
+            
"{\"metadataLocation\":\"gcs://tab5/metadata/v793.metadata.json\",\"status\":\"success\"}");
+
+    SnowflakeTableMetadata actualMetadata =
+        snowflakeClient.loadTableMetadata(
+            SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1"));
+
+    verify(mockQueryHarness)
+        .query(
+            eq(mockConnection),
+            eq("SELECT SYSTEM$GET_ICEBERG_TABLE_INFORMATION(?) AS METADATA"),
+            any(JdbcSnowflakeClient.ResultSetParser.class),
+            eq("DB_1.SCHEMA_1.TABLE_1"));
+
+    SnowflakeTableMetadata expectedMetadata =
+        new SnowflakeTableMetadata(
+            "gcs://tab5/metadata/v793.metadata.json",
+            "gs://tab5/metadata/v793.metadata.json",
+            "success",
+            null);
+    Assertions.assertThat(actualMetadata).isEqualTo(expectedMetadata);
+  }
+
+  /** Malformed JSON from a ResultSet should propagate as an 
IllegalArgumentException. */
+  @Test
+  public void testGetTableMetadataMalformedJson() throws SQLException {
+    when(mockResultSet.next()).thenReturn(true);
+    
when(mockResultSet.getString("METADATA")).thenReturn("{\"malformed_no_closing_bracket");
+    Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(
+            () ->
+                snowflakeClient.loadTableMetadata(
+                    SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", 
"TABLE_1")))
+        .withMessageContaining("{\"malformed_no_closing_bracket");
+  }
+
+  /**
+   * Any unexpected SQLException from the underlying connection will propagate 
out as an
+   * UncheckedSQLException when getting table metadata.
+   */
+  @Test
+  public void testGetTableMetadataSQLException() throws SQLException, 
InterruptedException {
+    when(mockClientPool.run(any(ClientPool.Action.class)))
+        .thenThrow(new SQLException("Fake SQL exception"));
+    Assertions.assertThatExceptionOfType(UncheckedSQLException.class)
+        .isThrownBy(
+            () ->
+                snowflakeClient.loadTableMetadata(
+                    SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", 
"TABLE_1")))
+        .withStackTraceContaining("Fake SQL exception");
+  }
+
+  /**
+   * Any unexpected InterruptedException from the underlying connection will 
propagate out as an
+   * UncheckedInterruptedException when getting table metadata.
+   */
+  @Test
+  public void testGetTableMetadataInterruptedException() throws SQLException, 
InterruptedException {
+    when(mockClientPool.run(any(ClientPool.Action.class)))
+        .thenThrow(new InterruptedException("Fake interrupted exception"));
+    Assertions.assertThatExceptionOfType(UncheckedInterruptedException.class)
+        .isThrownBy(
+            () ->
+                snowflakeClient.loadTableMetadata(
+                    SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", 
"TABLE_1")))
+        .withStackTraceContaining("Fake interrupted exception");
+  }
+
+  /** Calling close() propagates to closing underlying client pool. */
+  @Test
+  public void testClose() {
+    snowflakeClient.close();
+    verify(mockClientPool).close();
+  }
+}
diff --git 
a/snowflake/src/test/java/org/apache/iceberg/snowflake/NamespaceHelpersTest.java
 
b/snowflake/src/test/java/org/apache/iceberg/snowflake/NamespaceHelpersTest.java
new file mode 100644
index 0000000000..2dd7fb6ec9
--- /dev/null
+++ 
b/snowflake/src/test/java/org/apache/iceberg/snowflake/NamespaceHelpersTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+public class NamespaceHelpersTest {
+  @Test
+  public void testRoundTripRoot() {
+    Namespace icebergNamespace = Namespace.empty();
+    SnowflakeIdentifier snowflakeIdentifier =
+        NamespaceHelpers.toSnowflakeIdentifier(icebergNamespace);
+    
Assertions.assertThat(snowflakeIdentifier).isEqualTo(SnowflakeIdentifier.ofRoot());
+    
Assertions.assertThat(NamespaceHelpers.toIcebergNamespace(snowflakeIdentifier))
+        .isEqualTo(icebergNamespace);
+  }
+
+  @Test
+  public void testRoundTripDatabase() {
+    Namespace icebergNamespace = Namespace.of("DB1");
+    SnowflakeIdentifier snowflakeIdentifier =
+        NamespaceHelpers.toSnowflakeIdentifier(icebergNamespace);
+    
Assertions.assertThat(snowflakeIdentifier).isEqualTo(SnowflakeIdentifier.ofDatabase("DB1"));
+    
Assertions.assertThat(NamespaceHelpers.toIcebergNamespace(snowflakeIdentifier))
+        .isEqualTo(icebergNamespace);
+  }
+
+  @Test
+  public void testRoundTripSchema() {
+    Namespace icebergNamespace = Namespace.of("DB1", "SCHEMA1");
+    SnowflakeIdentifier snowflakeIdentifier =
+        NamespaceHelpers.toSnowflakeIdentifier(icebergNamespace);
+    Assertions.assertThat(snowflakeIdentifier)
+        .isEqualTo(SnowflakeIdentifier.ofSchema("DB1", "SCHEMA1"));
+    
Assertions.assertThat(NamespaceHelpers.toIcebergNamespace(snowflakeIdentifier))
+        .isEqualTo(icebergNamespace);
+  }
+
+  @Test
+  public void testRoundTripTable() {
+    TableIdentifier icebergTable = TableIdentifier.of("DB1", "SCHEMA1", 
"TABLE1");
+    SnowflakeIdentifier snowflakeIdentifier = 
NamespaceHelpers.toSnowflakeIdentifier(icebergTable);
+    Assertions.assertThat(snowflakeIdentifier)
+        .isEqualTo(SnowflakeIdentifier.ofTable("DB1", "SCHEMA1", "TABLE1"));
+    
Assertions.assertThat(NamespaceHelpers.toIcebergTableIdentifier(snowflakeIdentifier))
+        .isEqualTo(icebergTable);
+  }
+
+  @Test
+  public void testToSnowflakeIdentifierMaxNamespaceLevel() {
+    Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(
+            () ->
+                NamespaceHelpers.toSnowflakeIdentifier(
+                    Namespace.of("DB1", "SCHEMA1", "THIRD_NS_LVL")))
+        .withMessageContaining("max namespace level");
+  }
+
+  @Test
+  public void testToSnowflakeIdentifierTableBadNamespace() {
+    Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(
+            () ->
+                NamespaceHelpers.toSnowflakeIdentifier(
+                    TableIdentifier.of(Namespace.of("DB1_WITHOUT_SCHEMA"), 
"TABLE1")))
+        .withMessageContaining("must be at the SCHEMA level");
+  }
+
+  @Test
+  public void testToIcebergNamespaceTableFails() {
+    Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(
+            () ->
+                NamespaceHelpers.toIcebergNamespace(
+                    SnowflakeIdentifier.ofTable("DB1", "SCHEMA1", "TABLE1")))
+        .withMessageContaining("Cannot convert identifier");
+  }
+
+  @Test
+  public void testToIcebergTableIdentifier() {
+    Assertions.assertThat(
+            NamespaceHelpers.toIcebergTableIdentifier(
+                SnowflakeIdentifier.ofTable("DB1", "SCHEMA1", "TABLE1")))
+        .isEqualTo(TableIdentifier.of("DB1", "SCHEMA1", "TABLE1"));
+  }
+
+  @Test
+  public void testToIcebergTableIdentifierWrongType() {
+    Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(
+            () ->
+                NamespaceHelpers.toIcebergTableIdentifier(
+                    SnowflakeIdentifier.ofSchema("DB1", "SCHEMA1")))
+        .withMessageContaining("must be type TABLE");
+  }
+}
diff --git 
a/snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java
 
b/snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java
new file mode 100644
index 0000000000..9f66f352e8
--- /dev/null
+++ 
b/snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InMemoryFileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SnowflakeCatalogTest {
+
+  private static final String TEST_CATALOG_NAME = "slushLog";
+  private SnowflakeCatalog catalog;
+  private FakeSnowflakeClient fakeClient;
+  private InMemoryFileIO fakeFileIO;
+  private SnowflakeCatalog.FileIOFactory fakeFileIOFactory;
+  private Map<String, String> properties;
+
+  @Before
+  public void before() {
+    catalog = new SnowflakeCatalog();
+
+    fakeClient = new FakeSnowflakeClient();
+    fakeClient.addTable(
+        SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TAB_1"),
+        SnowflakeTableMetadata.parseJson(
+            
"{\"metadataLocation\":\"s3://tab1/metadata/v3.metadata.json\",\"status\":\"success\"}"));
+    fakeClient.addTable(
+        SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TAB_2"),
+        SnowflakeTableMetadata.parseJson(
+            
"{\"metadataLocation\":\"s3://tab2/metadata/v1.metadata.json\",\"status\":\"success\"}"));
+    fakeClient.addTable(
+        SnowflakeIdentifier.ofTable("DB_2", "SCHEMA_2", "TAB_3"),
+        SnowflakeTableMetadata.parseJson(
+            
"{\"metadataLocation\":\"azure://myaccount.blob.core.windows.net/mycontainer/tab3/metadata/v334.metadata.json\",\"status\":\"success\"}"));
+    fakeClient.addTable(
+        SnowflakeIdentifier.ofTable("DB_2", "SCHEMA_2", "TAB_4"),
+        SnowflakeTableMetadata.parseJson(
+            
"{\"metadataLocation\":\"azure://myaccount.blob.core.windows.net/mycontainer/tab4/metadata/v323.metadata.json\",\"status\":\"success\"}"));
+    fakeClient.addTable(
+        SnowflakeIdentifier.ofTable("DB_3", "SCHEMA_3", "TAB_5"),
+        SnowflakeTableMetadata.parseJson(
+            
"{\"metadataLocation\":\"gcs://tab5/metadata/v793.metadata.json\",\"status\":\"success\"}"));
+    fakeClient.addTable(
+        SnowflakeIdentifier.ofTable("DB_3", "SCHEMA_4", "TAB_6"),
+        SnowflakeTableMetadata.parseJson(
+            
"{\"metadataLocation\":\"gcs://tab6/metadata/v123.metadata.json\",\"status\":\"success\"}"));
+
+    fakeFileIO = new InMemoryFileIO();
+
+    Schema schema =
+        new Schema(
+            Types.NestedField.required(1, "x", Types.StringType.get(), 
"comment1"),
+            Types.NestedField.required(2, "y", Types.StringType.get(), 
"comment2"));
+    PartitionSpec partitionSpec =
+        
PartitionSpec.builderFor(schema).identity("x").withSpecId(1000).build();
+    fakeFileIO.addFile(
+        "s3://tab1/metadata/v3.metadata.json",
+        TableMetadataParser.toJson(
+                TableMetadata.newTableMetadata(
+                    schema, partitionSpec, "s3://tab1/", ImmutableMap.<String, 
String>of()))
+            .getBytes());
+    fakeFileIO.addFile(
+        
"wasbs://[email protected]/tab3/metadata/v334.metadata.json",
+        TableMetadataParser.toJson(
+                TableMetadata.newTableMetadata(
+                    schema,
+                    partitionSpec,
+                    
"wasbs://[email protected]/tab1/",
+                    ImmutableMap.<String, String>of()))
+            .getBytes());
+    fakeFileIO.addFile(
+        "gs://tab5/metadata/v793.metadata.json",
+        TableMetadataParser.toJson(
+                TableMetadata.newTableMetadata(
+                    schema, partitionSpec, "gs://tab5/", ImmutableMap.<String, 
String>of()))
+            .getBytes());
+
+    fakeFileIOFactory =
+        new SnowflakeCatalog.FileIOFactory() {
+          @Override
+          public FileIO newFileIO(String impl, Map<String, String> prop, 
Object hadoopConf) {
+            return fakeFileIO;
+          }
+        };
+
+    properties = Maps.newHashMap();
+    catalog.initialize(TEST_CATALOG_NAME, fakeClient, fakeFileIOFactory, 
properties);
+  }
+
+  @Test
+  public void testInitializeNullClient() {
+    Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(
+            () -> catalog.initialize(TEST_CATALOG_NAME, null, 
fakeFileIOFactory, properties))
+        .withMessageContaining("snowflakeClient must be non-null");
+  }
+
+  @Test
+  public void testInitializeNullFileIO() {
+    Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(() -> catalog.initialize(TEST_CATALOG_NAME, fakeClient, 
null, properties))
+        .withMessageContaining("fileIOFactory must be non-null");
+  }
+
+  @Test
+  public void testListNamespaceInRoot() {
+    Assertions.assertThat(catalog.listNamespaces())
+        .containsExactly(Namespace.of("DB_1"), Namespace.of("DB_2"), 
Namespace.of("DB_3"));
+  }
+
+  @Test
+  public void testListNamespaceWithinDB() {
+    String dbName = "DB_1";
+    Assertions.assertThat(catalog.listNamespaces(Namespace.of(dbName)))
+        .containsExactly(Namespace.of(dbName, "SCHEMA_1"));
+  }
+
+  @Test
+  public void testListNamespaceWithinNonExistentDB() {
+    // Existence check for nonexistent parent namespaces is optional in the 
SupportsNamespaces
+    // interface.
+    String dbName = "NONEXISTENT_DB";
+    Assertions.assertThatExceptionOfType(RuntimeException.class)
+        .isThrownBy(() -> catalog.listNamespaces(Namespace.of(dbName)))
+        .withMessageContaining("does not exist")
+        .withMessageContaining(dbName);
+  }
+
+  @Test
+  public void testListNamespaceWithinSchema() {
+    // No "sub-namespaces" beyond database.schema; invalid to try to list 
namespaces given
+    // a database.schema.
+    String dbName = "DB_3";
+    String schemaName = "SCHEMA_4";
+    Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(() -> catalog.listNamespaces(Namespace.of(dbName, 
schemaName)))
+        .withMessageContaining("level")
+        .withMessageContaining("DB_3.SCHEMA_4");
+  }
+
+  @Test
+  public void testListTables() {
+    Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(() -> catalog.listTables(Namespace.empty()))
+        .withMessageContaining("listTables must be at SCHEMA level");
+  }
+
+  @Test
+  public void testListTablesWithinDB() {
+    String dbName = "DB_1";
+    Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(() -> catalog.listTables(Namespace.of(dbName)))
+        .withMessageContaining("listTables must be at SCHEMA level");
+  }
+
+  @Test
+  public void testListTablesWithinNonexistentDB() {
+    String dbName = "NONEXISTENT_DB";
+    String schemaName = "NONEXISTENT_SCHEMA";
+    Assertions.assertThatExceptionOfType(RuntimeException.class)
+        .isThrownBy(() -> catalog.listTables(Namespace.of(dbName, schemaName)))
+        .withMessageContaining("does not exist")
+        .withMessageContaining(dbName);
+  }
+
+  @Test
+  public void testListTablesWithinSchema() {
+    String dbName = "DB_2";
+    String schemaName = "SCHEMA_2";
+    Assertions.assertThat(catalog.listTables(Namespace.of(dbName, schemaName)))
+        .containsExactly(
+            TableIdentifier.of("DB_2", "SCHEMA_2", "TAB_3"),
+            TableIdentifier.of("DB_2", "SCHEMA_2", "TAB_4"));
+  }
+
+  @Test
+  public void testListTablesWithinNonexistentSchema() {
+    String dbName = "DB_2";
+    String schemaName = "NONEXISTENT_SCHEMA";
+    Assertions.assertThatExceptionOfType(RuntimeException.class)
+        .isThrownBy(() -> catalog.listTables(Namespace.of(dbName, schemaName)))
+        .withMessageContaining("does not exist")
+        .withMessageContaining("DB_2.NONEXISTENT_SCHEMA");
+  }
+
+  @Test
+  public void testLoadS3Table() {
+    Table table = catalog.loadTable(TableIdentifier.of(Namespace.of("DB_1", 
"SCHEMA_1"), "TAB_1"));
+    Assertions.assertThat(table.location()).isEqualTo("s3://tab1/");
+  }
+
+  @Test
+  public void testLoadAzureTable() {
+    Table table = catalog.loadTable(TableIdentifier.of(Namespace.of("DB_2", 
"SCHEMA_2"), "TAB_3"));
+    Assertions.assertThat(table.location())
+        
.isEqualTo("wasbs://[email protected]/tab1/");
+  }
+
+  @Test
+  public void testLoadGcsTable() {
+    Table table = catalog.loadTable(TableIdentifier.of(Namespace.of("DB_3", 
"SCHEMA_3"), "TAB_5"));
+    Assertions.assertThat(table.location()).isEqualTo("gs://tab5/");
+  }
+
+  @Test
+  public void testLoadTableWithMalformedTableIdentifier() {
+    Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(
+            () ->
+                catalog.loadTable(
+                    TableIdentifier.of(Namespace.of("DB_1", "SCHEMA_1", 
"BAD_NS_LEVEL"), "TAB_1")))
+        .withMessageContaining("level")
+        .withMessageContaining("DB_1.SCHEMA_1.BAD_NS_LEVEL");
+    Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(
+            () -> 
catalog.loadTable(TableIdentifier.of(Namespace.of("DB_WITHOUT_SCHEMA"), 
"TAB_1")))
+        .withMessageContaining("level")
+        .withMessageContaining("DB_WITHOUT_SCHEMA.TAB_1");
+  }
+
+  @Test
+  public void testCloseBeforeInitializeDoesntThrow() throws IOException {
+    catalog = new SnowflakeCatalog();
+
+    // Make sure no exception is thrown if we call close() before 
initialize(), in case callers
+    // add a catalog to auto-close() helpers but end up never 
using/initializing a catalog.
+    catalog.close();
+
+    Assertions.assertThat(fakeClient.isClosed())
+        .overridingErrorMessage("expected not to have called close() on 
snowflakeClient")
+        .isFalse();
+  }
+
+  @Test
+  public void testClose() throws IOException {
+    catalog.newTableOps(TableIdentifier.of("DB_1", "SCHEMA_1", "TAB_1"));
+    catalog.close();
+    Assertions.assertThat(fakeClient.isClosed())
+        .overridingErrorMessage("expected close() to propagate to 
snowflakeClient")
+        .isTrue();
+    Assertions.assertThat(fakeFileIO.isClosed())
+        .overridingErrorMessage("expected close() to propagate to fileIO")
+        .isTrue();
+  }
+
+  @Test
+  public void testTableNameFromTableOperations() {
+    SnowflakeTableOperations castedTableOps =
+        (SnowflakeTableOperations)
+            catalog.newTableOps(TableIdentifier.of("DB_1", "SCHEMA_1", 
"TAB_1"));
+    
Assertions.assertThat(castedTableOps.fullTableName()).isEqualTo("slushLog.DB_1.SCHEMA_1.TAB_1");
+  }
+
+  @Test
+  public void testDatabaseExists() {
+    
Assertions.assertThat(catalog.namespaceExists(Namespace.of("DB_1"))).isTrue();
+    
Assertions.assertThat(catalog.namespaceExists(Namespace.of("NONEXISTENT_DB"))).isFalse();
+  }
+
+  @Test
+  public void testSchemaExists() {
+    Assertions.assertThat(catalog.namespaceExists(Namespace.of("DB_1", 
"SCHEMA_1"))).isTrue();
+    Assertions.assertThat(catalog.namespaceExists(Namespace.of("DB_1", 
"NONEXISTENT_SCHEMA")))
+        .isFalse();
+    
Assertions.assertThat(catalog.namespaceExists(Namespace.of("NONEXISTENT_DB", 
"SCHEMA_1")))
+        .isFalse();
+  }
+}
diff --git a/spark/v3.1/build.gradle b/spark/v3.1/build.gradle
index eca34afcbd..c7861d36e5 100644
--- a/spark/v3.1/build.gradle
+++ b/spark/v3.1/build.gradle
@@ -213,6 +213,9 @@ project(':iceberg-spark:iceberg-spark-runtime-3.1_2.12') {
     implementation(project(':iceberg-nessie')) {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
+    implementation (project(':iceberg-snowflake')) {
+      exclude group: 'net.snowflake' , module: 'snowflake-jdbc'
+    }
 
     integrationImplementation 
"org.apache.spark:spark-hive_2.12:${sparkVersion}"
     integrationImplementation 'org.junit.vintage:junit-vintage-engine'
diff --git a/spark/v3.2/build.gradle b/spark/v3.2/build.gradle
index 62a51ae494..c7bef9928d 100644
--- a/spark/v3.2/build.gradle
+++ b/spark/v3.2/build.gradle
@@ -224,6 +224,9 @@ 
project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
     implementation(project(':iceberg-nessie')) {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
+    implementation (project(':iceberg-snowflake')) {
+      exclude group: 'net.snowflake' , module: 'snowflake-jdbc'
+    }
 
     integrationImplementation 
"org.scala-lang.modules:scala-collection-compat_${scalaVersion}"
     integrationImplementation 
"org.apache.spark:spark-hive_${scalaVersion}:${sparkVersion}"
diff --git a/spark/v3.3/build.gradle b/spark/v3.3/build.gradle
index 56a6fe7f51..3f7cf71b74 100644
--- a/spark/v3.3/build.gradle
+++ b/spark/v3.3/build.gradle
@@ -216,6 +216,9 @@ 
project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
     implementation(project(':iceberg-nessie')) {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
+    implementation (project(':iceberg-snowflake')) {
+      exclude group: 'net.snowflake' , module: 'snowflake-jdbc'
+    }
 
     integrationImplementation 
"org.scala-lang.modules:scala-collection-compat_${scalaVersion}"
     integrationImplementation 
"org.apache.spark:spark-hive_${scalaVersion}:${sparkVersion}"
diff --git a/versions.props b/versions.props
index d007ff43ac..670487b997 100644
--- a/versions.props
+++ b/versions.props
@@ -28,6 +28,7 @@ org.scala-lang.modules:scala-collection-compat_2.12 = 2.6.0
 org.scala-lang.modules:scala-collection-compat_2.13 = 2.6.0
 com.emc.ecs:object-client-bundle = 3.3.2
 org.immutables:value = 2.9.2
+net.snowflake:snowflake-jdbc = 3.13.22
 
 # test deps
 org.junit.vintage:junit-vintage-engine = 5.8.2

Reply via email to