This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new e56c1cf8c1 Add new SnowflakeCatalog implementation to enable directly
using Snowflake-managed Iceberg tables (#6428)
e56c1cf8c1 is described below
commit e56c1cf8c16f527bf91b74399602138e215f4186
Author: Dennis Huo <[email protected]>
AuthorDate: Fri Jan 13 19:57:46 2023 -0800
Add new SnowflakeCatalog implementation to enable directly using
Snowflake-managed Iceberg tables (#6428)
* Initial read-only Snowflake Catalog implementation by @sfc-gh-mparmar (#1)
Initial read-only Snowflake Catalog implementation built on top of the
Snowflake JDBC driver,
providing support for basic listing of namespaces, listing of tables, and
loading/reads of tables.
Auth options are passthrough to the JDBC driver.
Co-authored-by: Maninder Parmar <[email protected]>
Co-authored-by: Maninder Parmar <[email protected]>
Co-authored-by: Dennis Huo <[email protected]>
* Add JdbcSnowflakeClientTest using mocks (#2)
Add JdbcSnowflakeClientTest using mocks; provides full coverage of
JdbcSnowflakeClient
and entities' ResultSetHandler logic.
Also update target Spark runtime versions to be included.
* Add test { useJUnitPlatform() } tuple to iceberg-snowflake for
consistency and future interoperability with inheriting from abstact
unittest base classes.
* Extract versions into versions.props per PR review
* Misc test-related refactors per review suggestions
-Convert unittests to all use assertj/Assertions for "fluent assertions"
-Refactor test injection into overloaded initialize() method
-Add test cases for close() propagation
-Use CloseableGroup.
* Fix unsupported behaviors of loadNamedpaceMetadata and
defaultWarehouseLocation
* Move TableIdentifier checks out of newTableOps into the
SnowflakTableOperations class itself, add test case.
* Refactor out any Namespace-related business logic from the lower
SnowflakeClient/JdbcSnowflakeClient layers and merge SnowflakeTable
and SnowflakeSchema into a single SnowflakeIdentifier that also
encompasses ROOT and DATABASE level identifiers.
A SnowflakeIdentifier thus functions like a type-checked/constrained
Iceberg TableIdentifier, and eliminates any tight coupling between
a SnowflakeClient and Catalog business logic.
Parsing of Namespace numerical levels into a SnowflakeIdentifier
is now fully encapsulated in NamespaceHelpers so that callsites
don't duplicate namespace-handling/validation logic.
* Finish migrating JdbcSnowflakeClientTest off any usage of org.junit.Assert
in favor of assertj's Assertions.
* Style refactorings from review comments, expanded and moved
InMemoryFileIO into core
with its own unittest.
* Fix behavior of getNamespaceMetadata to throw when the namespace doesn't
exist.
Refactor for naming conventions and consolidating identifier
handling into NamespaceHandlers.
Make FileIO instantiated fresh for each newTableOps call.
* Move private constructor to top, add assertion to test case.
* Define minimal ResultSetParser/QueryHarness classes to fully replace
any use of commons-dbutils; refactor ResultSet handling fully into
JdbcSnowflakeClient.java.
* Update
snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableOperations.java
Co-authored-by: Eduard Tudenhöfner <[email protected]>
* Refactor style suggestions; remove debug-level logging, arguments in
exceptions,
private members if not accessed outside, move precondition checks, add test
for
NamespaceHelpers.
* Fix precondition messages, remove getConf()
* Clean up varargs.
* Make data members final, include rawJsonVal in toString for debuggability.
* Combine some small test cases into roundtrip test cases, misc cleanup
* Add comment for why a factory class is exposed for testing purposes.
Co-authored-by: Dennis Huo <[email protected]>
Co-authored-by: Maninder Parmar <[email protected]>
Co-authored-by: Maninder Parmar <[email protected]>
Co-authored-by: Eduard Tudenhöfner <[email protected]>
---
.github/labeler.yml | 4 +-
build.gradle | 18 +
.../org/apache/iceberg/jdbc/JdbcClientPool.java | 6 +-
settings.gradle | 2 +
.../iceberg/snowflake/JdbcSnowflakeClient.java | 378 +++++++++++++
.../apache/iceberg/snowflake/NamespaceHelpers.java | 100 ++++
.../apache/iceberg/snowflake/SnowflakeCatalog.java | 249 +++++++++
.../apache/iceberg/snowflake/SnowflakeClient.java | 64 +++
.../iceberg/snowflake/SnowflakeIdentifier.java | 133 +++++
.../iceberg/snowflake/SnowflakeTableMetadata.java | 150 +++++
.../snowflake/SnowflakeTableOperations.java | 98 ++++
.../iceberg/snowflake/FakeSnowflakeClient.java | 191 +++++++
.../iceberg/snowflake/JdbcSnowflakeClientTest.java | 603 +++++++++++++++++++++
.../iceberg/snowflake/NamespaceHelpersTest.java | 115 ++++
.../iceberg/snowflake/SnowflakeCatalogTest.java | 298 ++++++++++
spark/v3.1/build.gradle | 3 +
spark/v3.2/build.gradle | 3 +
spark/v3.3/build.gradle | 3 +
versions.props | 1 +
19 files changed, 2415 insertions(+), 4 deletions(-)
diff --git a/.github/labeler.yml b/.github/labeler.yml
index 521e1a42aa..c623fbc6dd 100644
--- a/.github/labeler.yml
+++ b/.github/labeler.yml
@@ -81,4 +81,6 @@ ALIYUN:
GCP:
- gcp/**/*
DELL:
- - dell/**/*
\ No newline at end of file
+ - dell/**/*
+SNOWFLAKE:
+ - snowflake/**/*
diff --git a/build.gradle b/build.gradle
index 0fba8cf1a9..7b14f3b731 100644
--- a/build.gradle
+++ b/build.gradle
@@ -697,6 +697,24 @@ project(':iceberg-dell') {
}
}
+project(':iceberg-snowflake') {
+ test {
+ useJUnitPlatform()
+ }
+
+ dependencies {
+ implementation project(':iceberg-core')
+ implementation project(':iceberg-common')
+ implementation project(path: ':iceberg-bundled-guava', configuration:
'shadow')
+ implementation "com.fasterxml.jackson.core:jackson-databind"
+ implementation "com.fasterxml.jackson.core:jackson-core"
+
+ runtimeOnly("net.snowflake:snowflake-jdbc")
+
+ testImplementation project(path: ':iceberg-core', configuration:
'testArtifacts')
+ }
+}
+
@Memoized
boolean versionFileExists() {
return file('version.txt').exists()
diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
b/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
index daa04908f4..60e5eb49a4 100644
--- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
+++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
@@ -27,12 +27,12 @@ import java.util.Properties;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.ClientPoolImpl;
-class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> {
+public class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> {
private final String dbUrl;
private final Map<String, String> properties;
- JdbcClientPool(String dbUrl, Map<String, String> props) {
+ public JdbcClientPool(String dbUrl, Map<String, String> props) {
this(
Integer.parseInt(
props.getOrDefault(
@@ -42,7 +42,7 @@ class JdbcClientPool extends ClientPoolImpl<Connection,
SQLException> {
props);
}
- JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props) {
+ public JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props)
{
super(poolSize, SQLNonTransientConnectionException.class, true);
properties = props;
this.dbUrl = dbUrl;
diff --git a/settings.gradle b/settings.gradle
index d1a14abe5b..c5ac07e080 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -34,6 +34,7 @@ include 'hive-metastore'
include 'nessie'
include 'gcp'
include 'dell'
+include 'snowflake'
project(':api').name = 'iceberg-api'
project(':common').name = 'iceberg-common'
@@ -51,6 +52,7 @@ project(':hive-metastore').name = 'iceberg-hive-metastore'
project(':nessie').name = 'iceberg-nessie'
project(':gcp').name = 'iceberg-gcp'
project(':dell').name = 'iceberg-dell'
+project(':snowflake').name = 'iceberg-snowflake'
if (null != System.getProperty("allVersions")) {
System.setProperty("flinkVersions", System.getProperty("knownFlinkVersions"))
diff --git
a/snowflake/src/main/java/org/apache/iceberg/snowflake/JdbcSnowflakeClient.java
b/snowflake/src/main/java/org/apache/iceberg/snowflake/JdbcSnowflakeClient.java
new file mode 100644
index 0000000000..1618f76c10
--- /dev/null
+++
b/snowflake/src/main/java/org/apache/iceberg/snowflake/JdbcSnowflakeClient.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.iceberg.jdbc.JdbcClientPool;
+import org.apache.iceberg.jdbc.UncheckedInterruptedException;
+import org.apache.iceberg.jdbc.UncheckedSQLException;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * This implementation of SnowflakeClient builds on top of Snowflake's JDBC
driver to interact with
+ * Snowflake's Iceberg-aware resource model.
+ */
+class JdbcSnowflakeClient implements SnowflakeClient {
+ static final String EXPECTED_JDBC_IMPL =
"net.snowflake.client.jdbc.SnowflakeDriver";
+
+ @FunctionalInterface
+ interface ResultSetParser<T> {
+ T parse(ResultSet rs) throws SQLException;
+ }
+
+ /**
+ * This class wraps the basic boilerplate of setting up PreparedStatements
and applying a
+ * ResultSetParser to translate a ResultSet into parsed objects. Allows
easily injecting
+ * subclasses for debugging/testing purposes.
+ */
+ static class QueryHarness {
+ public <T> T query(Connection conn, String sql, ResultSetParser<T> parser,
String... args)
+ throws SQLException {
+ try (PreparedStatement statement = conn.prepareStatement(sql)) {
+ if (args != null) {
+ for (int i = 0; i < args.length; ++i) {
+ statement.setString(i + 1, args[i]);
+ }
+ }
+
+ try (ResultSet rs = statement.executeQuery()) {
+ return parser.parse(rs);
+ }
+ }
+ }
+ }
+
+ /**
+ * Expects to handle ResultSets representing fully-qualified Snowflake
Database identifiers,
+ * containing "name" (representing databaseName).
+ */
+ public static final ResultSetParser<List<SnowflakeIdentifier>>
DATABASE_RESULT_SET_HANDLER =
+ rs -> {
+ List<SnowflakeIdentifier> databases = Lists.newArrayList();
+ while (rs.next()) {
+ String databaseName = rs.getString("name");
+ databases.add(SnowflakeIdentifier.ofDatabase(databaseName));
+ }
+ return databases;
+ };
+
+ /**
+ * Expects to handle ResultSets representing fully-qualified Snowflake
Schema identifiers,
+ * containing "database_name" and "name" (representing schemaName).
+ */
+ public static final ResultSetParser<List<SnowflakeIdentifier>>
SCHEMA_RESULT_SET_HANDLER =
+ rs -> {
+ List<SnowflakeIdentifier> schemas = Lists.newArrayList();
+ while (rs.next()) {
+ String databaseName = rs.getString("database_name");
+ String schemaName = rs.getString("name");
+ schemas.add(SnowflakeIdentifier.ofSchema(databaseName, schemaName));
+ }
+ return schemas;
+ };
+
+ /**
+ * Expects to handle ResultSets representing fully-qualified Snowflake Table
identifiers,
+ * containing "database_name", "schema_name", and "name" (representing
tableName).
+ */
+ public static final ResultSetParser<List<SnowflakeIdentifier>>
TABLE_RESULT_SET_HANDLER =
+ rs -> {
+ List<SnowflakeIdentifier> tables = Lists.newArrayList();
+ while (rs.next()) {
+ String databaseName = rs.getString("database_name");
+ String schemaName = rs.getString("schema_name");
+ String tableName = rs.getString("name");
+ tables.add(SnowflakeIdentifier.ofTable(databaseName, schemaName,
tableName));
+ }
+ return tables;
+ };
+
+ /**
+ * Expects to handle ResultSets representing a single record holding
Snowflake Iceberg metadata.
+ */
+ public static final ResultSetParser<SnowflakeTableMetadata>
TABLE_METADATA_RESULT_SET_HANDLER =
+ rs -> {
+ if (!rs.next()) {
+ return null;
+ }
+
+ String rawJsonVal = rs.getString("METADATA");
+ return SnowflakeTableMetadata.parseJson(rawJsonVal);
+ };
+
+ private final JdbcClientPool connectionPool;
+ private QueryHarness queryHarness;
+
+ JdbcSnowflakeClient(JdbcClientPool conn) {
+ Preconditions.checkArgument(null != conn, "JdbcClientPool must be
non-null");
+ connectionPool = conn;
+ queryHarness = new QueryHarness();
+ }
+
+ @VisibleForTesting
+ void setQueryHarness(QueryHarness queryHarness) {
+ this.queryHarness = queryHarness;
+ }
+
+ /**
+ * For rare cases where PreparedStatements aren't supported for
user-supplied identifiers intended
+ * for use in special LIKE clauses, we can sanitize by "broadening" the
identifier with
+ * single-character wildcards and manually post-filter client-side.
+ *
+ * <p>Note: This sanitization approach intentionally "broadens" the scope of
matching results;
+ * callers must be able to handle this method returning an all-wildcard
expression; i.e. the
+ * caller must treat the usage of the LIKE clause as only an optional
optimization, and should
+ * post-filter for correctness as if the LIKE clause wasn't present in the
query at all.
+ */
+ @VisibleForTesting
+ String sanitizeIdentifierWithWildcardForLikeClause(String identifier) {
+ // Restrict identifiers to the "Unquoted object identifiers" synax
documented at
+ // https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html
+ //
+ // Use a strict allowlist of characters, replace everything *not* matching
the character set
+ // with "_", which is used as a single-character wildcard in Snowflake.
+ String sanitized = identifier.replaceAll("[^a-zA-Z0-9_$]", "_");
+ if (sanitized.startsWith("$")) {
+ sanitized = "_" + sanitized.substring(1);
+ }
+ return sanitized;
+ }
+
+ @Override
+ public boolean databaseExists(SnowflakeIdentifier database) {
+ Preconditions.checkArgument(
+ database.type() == SnowflakeIdentifier.Type.DATABASE,
+ "databaseExists requires a DATABASE identifier, got '%s'",
+ database);
+
+ // Due to current limitations in PreparedStatement parameters for the LIKE
clause in
+ // SHOW DATABASES queries, we'll use a fairly limited allowlist for
identifier characters,
+ // using wildcards for non-allowed characters, and post-filter for
matching.
+ final String finalQuery =
+ String.format(
+ "SHOW DATABASES LIKE '%s' IN ACCOUNT",
+
sanitizeIdentifierWithWildcardForLikeClause(database.databaseName()));
+ List<SnowflakeIdentifier> databases;
+ try {
+ databases =
+ connectionPool.run(
+ conn -> queryHarness.query(conn, finalQuery,
DATABASE_RESULT_SET_HANDLER));
+ } catch (SQLException e) {
+ throw new UncheckedSQLException(e, "Failed to check if database '%s'
exists", database);
+ } catch (InterruptedException e) {
+ throw new UncheckedInterruptedException(
+ e, "Interrupted while checking if database '%s' exists", database);
+ }
+
+ // Filter to handle the edge case of '_' appearing as a wildcard that
can't be remapped the way
+ // it can for predicates in SELECT statements.
+ databases.removeIf(db ->
!database.databaseName().equalsIgnoreCase(db.databaseName()));
+ return !databases.isEmpty();
+ }
+
+ @Override
+ public boolean schemaExists(SnowflakeIdentifier schema) {
+ Preconditions.checkArgument(
+ schema.type() == SnowflakeIdentifier.Type.SCHEMA,
+ "schemaExists requires a SCHEMA identifier, got '%s'",
+ schema);
+
+ if
(!databaseExists(SnowflakeIdentifier.ofDatabase(schema.databaseName()))) {
+ return false;
+ }
+
+ // Due to current limitations in PreparedStatement parameters for the LIKE
clause in
+ // SHOW SCHEMAS queries, we'll use a fairly limited allowlist for
identifier characters,
+ // using wildcards for non-allowed characters, and post-filter for
matching.
+ final String finalQuery =
+ String.format(
+ "SHOW SCHEMAS LIKE '%s' IN DATABASE IDENTIFIER(?)",
+ sanitizeIdentifierWithWildcardForLikeClause(schema.schemaName()));
+ List<SnowflakeIdentifier> schemas;
+ try {
+ schemas =
+ connectionPool.run(
+ conn ->
+ queryHarness.query(
+ conn, finalQuery, SCHEMA_RESULT_SET_HANDLER,
schema.databaseName()));
+ } catch (SQLException e) {
+ throw new UncheckedSQLException(e, "Failed to check if schema '%s'
exists", schema);
+ } catch (InterruptedException e) {
+ throw new UncheckedInterruptedException(
+ e, "Interrupted while checking if schema '%s' exists", schema);
+ }
+
+ // Filter to handle the edge case of '_' appearing as a wildcard that
can't be remapped the way
+ // it can for predicates in SELECT statements.
+ schemas.removeIf(sc ->
!schema.schemaName().equalsIgnoreCase(sc.schemaName()));
+ return !schemas.isEmpty();
+ }
+
+ @Override
+ public List<SnowflakeIdentifier> listDatabases() {
+ List<SnowflakeIdentifier> databases;
+ try {
+ databases =
+ connectionPool.run(
+ conn ->
+ queryHarness.query(
+ conn, "SHOW DATABASES IN ACCOUNT",
DATABASE_RESULT_SET_HANDLER));
+ } catch (SQLException e) {
+ throw new UncheckedSQLException(e, "Failed to list databases");
+ } catch (InterruptedException e) {
+ throw new UncheckedInterruptedException(e, "Interrupted while listing
databases");
+ }
+ databases.forEach(
+ db ->
+ Preconditions.checkState(
+ db.type() == SnowflakeIdentifier.Type.DATABASE,
+ "Expected DATABASE, got identifier '%s'",
+ db));
+ return databases;
+ }
+
+ @Override
+ public List<SnowflakeIdentifier> listSchemas(SnowflakeIdentifier scope) {
+ StringBuilder baseQuery = new StringBuilder("SHOW SCHEMAS");
+ String[] queryParams = null;
+ switch (scope.type()) {
+ case ROOT:
+ // account-level listing
+ baseQuery.append(" IN ACCOUNT");
+ break;
+ case DATABASE:
+ // database-level listing
+ baseQuery.append(" IN DATABASE IDENTIFIER(?)");
+ queryParams = new String[] {scope.toIdentifierString()};
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported scope type for listSchemas: %s",
scope));
+ }
+
+ final String finalQuery = baseQuery.toString();
+ final String[] finalQueryParams = queryParams;
+ List<SnowflakeIdentifier> schemas;
+ try {
+ schemas =
+ connectionPool.run(
+ conn ->
+ queryHarness.query(
+ conn, finalQuery, SCHEMA_RESULT_SET_HANDLER,
finalQueryParams));
+ } catch (SQLException e) {
+ throw new UncheckedSQLException(e, "Failed to list schemas for scope
'%s'", scope);
+ } catch (InterruptedException e) {
+ throw new UncheckedInterruptedException(
+ e, "Interrupted while listing schemas for scope '%s'", scope);
+ }
+ schemas.forEach(
+ schema ->
+ Preconditions.checkState(
+ schema.type() == SnowflakeIdentifier.Type.SCHEMA,
+ "Expected SCHEMA, got identifier '%s' for scope '%s'",
+ schema,
+ scope));
+ return schemas;
+ }
+
+ @Override
+ public List<SnowflakeIdentifier> listIcebergTables(SnowflakeIdentifier
scope) {
+ StringBuilder baseQuery = new StringBuilder("SHOW ICEBERG TABLES");
+ String[] queryParams = null;
+ switch (scope.type()) {
+ case ROOT:
+ // account-level listing
+ baseQuery.append(" IN ACCOUNT");
+ break;
+ case DATABASE:
+ // database-level listing
+ baseQuery.append(" IN DATABASE IDENTIFIER(?)");
+ queryParams = new String[] {scope.toIdentifierString()};
+ break;
+ case SCHEMA:
+ // schema-level listing
+ baseQuery.append(" IN SCHEMA IDENTIFIER(?)");
+ queryParams = new String[] {scope.toIdentifierString()};
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported scope type for listIcebergTables: %s",
scope));
+ }
+
+ final String finalQuery = baseQuery.toString();
+ final String[] finalQueryParams = queryParams;
+ List<SnowflakeIdentifier> tables;
+ try {
+ tables =
+ connectionPool.run(
+ conn ->
+ queryHarness.query(conn, finalQuery,
TABLE_RESULT_SET_HANDLER, finalQueryParams));
+ } catch (SQLException e) {
+ throw new UncheckedSQLException(e, "Failed to list tables for scope
'%s'", scope);
+ } catch (InterruptedException e) {
+ throw new UncheckedInterruptedException(
+ e, "Interrupted while listing tables for scope '%s'", scope);
+ }
+ tables.forEach(
+ table ->
+ Preconditions.checkState(
+ table.type() == SnowflakeIdentifier.Type.TABLE,
+ "Expected TABLE, got identifier '%s' for scope '%s'",
+ table,
+ scope));
+ return tables;
+ }
+
+ @Override
+ public SnowflakeTableMetadata loadTableMetadata(SnowflakeIdentifier
tableIdentifier) {
+ Preconditions.checkArgument(
+ tableIdentifier.type() == SnowflakeIdentifier.Type.TABLE,
+ "loadTableMetadata requires a TABLE identifier, got '%s'",
+ tableIdentifier);
+ SnowflakeTableMetadata tableMeta;
+ try {
+ final String finalQuery = "SELECT
SYSTEM$GET_ICEBERG_TABLE_INFORMATION(?) AS METADATA";
+ tableMeta =
+ connectionPool.run(
+ conn ->
+ queryHarness.query(
+ conn,
+ finalQuery,
+ TABLE_METADATA_RESULT_SET_HANDLER,
+ tableIdentifier.toIdentifierString()));
+ } catch (SQLException e) {
+ throw new UncheckedSQLException(e, "Failed to get table metadata for
'%s'", tableIdentifier);
+ } catch (InterruptedException e) {
+ throw new UncheckedInterruptedException(
+ e, "Interrupted while getting table metadata for '%s'",
tableIdentifier);
+ }
+ return tableMeta;
+ }
+
+ @Override
+ public void close() {
+ connectionPool.close();
+ }
+}
diff --git
a/snowflake/src/main/java/org/apache/iceberg/snowflake/NamespaceHelpers.java
b/snowflake/src/main/java/org/apache/iceberg/snowflake/NamespaceHelpers.java
new file mode 100644
index 0000000000..28dacbca98
--- /dev/null
+++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/NamespaceHelpers.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+class NamespaceHelpers {
+ private static final int MAX_NAMESPACE_DEPTH = 2;
+ private static final int NAMESPACE_ROOT_LEVEL = 0;
+ private static final int NAMESPACE_DB_LEVEL = 1;
+ private static final int NAMESPACE_SCHEMA_LEVEL = 2;
+
+ private NamespaceHelpers() {}
+
+ /**
+ * Converts a Namespace into a SnowflakeIdentifier representing ROOT, a
DATABASE, or a SCHEMA.
+ *
+ * @throws IllegalArgumentException if the namespace is not a supported
depth.
+ */
+ public static SnowflakeIdentifier toSnowflakeIdentifier(Namespace namespace)
{
+ switch (namespace.length()) {
+ case NAMESPACE_ROOT_LEVEL:
+ return SnowflakeIdentifier.ofRoot();
+ case NAMESPACE_DB_LEVEL:
+ return
SnowflakeIdentifier.ofDatabase(namespace.level(NAMESPACE_DB_LEVEL - 1));
+ case NAMESPACE_SCHEMA_LEVEL:
+ return SnowflakeIdentifier.ofSchema(
+ namespace.level(NAMESPACE_DB_LEVEL - 1),
namespace.level(NAMESPACE_SCHEMA_LEVEL - 1));
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "Snowflake max namespace level is %d, got namespace '%s'",
+ MAX_NAMESPACE_DEPTH, namespace));
+ }
+ }
+
+ /**
+ * Converts a TableIdentifier into a SnowflakeIdentifier of type TABLE; the
identifier must have
+ * exactly the right namespace depth to represent a fully-qualified
Snowflake table identifier.
+ */
+ public static SnowflakeIdentifier toSnowflakeIdentifier(TableIdentifier
identifier) {
+ SnowflakeIdentifier namespaceScope =
toSnowflakeIdentifier(identifier.namespace());
+ Preconditions.checkArgument(
+ namespaceScope.type() == SnowflakeIdentifier.Type.SCHEMA,
+ "Namespace portion of '%s' must be at the SCHEMA level, got
namespaceScope '%s'",
+ identifier,
+ namespaceScope);
+ return SnowflakeIdentifier.ofTable(
+ namespaceScope.databaseName(), namespaceScope.schemaName(),
identifier.name());
+ }
+
+ /**
+ * Converts a SnowflakeIdentifier of type ROOT, DATABASE, or SCHEMA into an
equivalent Iceberg
+ * Namespace; throws IllegalArgumentException if not an appropriate type.
+ */
+ public static Namespace toIcebergNamespace(SnowflakeIdentifier identifier) {
+ switch (identifier.type()) {
+ case ROOT:
+ return Namespace.empty();
+ case DATABASE:
+ return Namespace.of(identifier.databaseName());
+ case SCHEMA:
+ return Namespace.of(identifier.databaseName(),
identifier.schemaName());
+ default:
+ throw new IllegalArgumentException(
+ String.format("Cannot convert identifier '%s' to Namespace",
identifier));
+ }
+ }
+
+ /**
+ * Converts a SnowflakeIdentifier to an equivalent Iceberg TableIdentifier;
the identifier must be
+ * of type TABLE.
+ */
+ public static TableIdentifier toIcebergTableIdentifier(SnowflakeIdentifier
identifier) {
+ Preconditions.checkArgument(
+ identifier.type() == SnowflakeIdentifier.Type.TABLE,
+ "SnowflakeIdentifier must be type TABLE, got '%s'",
+ identifier);
+ return TableIdentifier.of(
+ identifier.databaseName(), identifier.schemaName(),
identifier.tableName());
+ }
+}
diff --git
a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java
b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java
new file mode 100644
index 0000000000..19302d5784
--- /dev/null
+++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.jdbc.JdbcClientPool;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SnowflakeCatalog extends BaseMetastoreCatalog
+ implements Closeable, SupportsNamespaces, Configurable<Object> {
+ private static final String DEFAULT_CATALOG_NAME = "snowflake_catalog";
+ private static final String DEFAULT_FILE_IO_IMPL =
"org.apache.iceberg.io.ResolvingFileIO";
+
+ // Injectable factory for testing purposes.
+ static class FileIOFactory {
+ public FileIO newFileIO(String impl, Map<String, String> properties,
Object hadoopConf) {
+ return CatalogUtil.loadFileIO(impl, properties, hadoopConf);
+ }
+ }
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SnowflakeCatalog.class);
+
+ private CloseableGroup closeableGroup;
+ private Object conf;
+ private String catalogName;
+ private Map<String, String> catalogProperties;
+ private FileIOFactory fileIOFactory;
+ private SnowflakeClient snowflakeClient;
+
+ public SnowflakeCatalog() {}
+
+ @Override
+ public List<TableIdentifier> listTables(Namespace namespace) {
+ SnowflakeIdentifier scope =
NamespaceHelpers.toSnowflakeIdentifier(namespace);
+ Preconditions.checkArgument(
+ scope.type() == SnowflakeIdentifier.Type.SCHEMA,
+ "listTables must be at SCHEMA level; got %s from namespace %s",
+ scope,
+ namespace);
+
+ List<SnowflakeIdentifier> sfTables =
snowflakeClient.listIcebergTables(scope);
+
+ return sfTables.stream()
+ .map(NamespaceHelpers::toIcebergTableIdentifier)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean dropTable(TableIdentifier identifier, boolean purge) {
+ throw new UnsupportedOperationException(
+ "SnowflakeCatalog does not currently support dropTable");
+ }
+
+ @Override
+ public void renameTable(TableIdentifier from, TableIdentifier to) {
+ throw new UnsupportedOperationException(
+ "SnowflakeCatalog does not currently support renameTable");
+ }
+
+ @Override
+ public void initialize(String name, Map<String, String> properties) {
+ String uri = properties.get(CatalogProperties.URI);
+ Preconditions.checkArgument(null != uri, "JDBC connection URI is
required");
+ try {
+ // We'll ensure the expected JDBC driver implementation class is
initialized through
+ // reflection regardless of which classloader ends up using this
JdbcSnowflakeClient, but
+ // we'll only warn if the expected driver fails to load, since users may
use repackaged or
+ // custom JDBC drivers for Snowflake communication.
+ Class.forName(JdbcSnowflakeClient.EXPECTED_JDBC_IMPL);
+ } catch (ClassNotFoundException cnfe) {
+ LOG.warn(
+ "Failed to load expected JDBC SnowflakeDriver - if queries fail by
failing"
+ + " to find a suitable driver for jdbc:snowflake:// URIs, you
must add the Snowflake "
+ + " JDBC driver to your jars/packages",
+ cnfe);
+ }
+ JdbcClientPool connectionPool = new JdbcClientPool(uri, properties);
+
+ initialize(name, new JdbcSnowflakeClient(connectionPool), new
FileIOFactory(), properties);
+ }
+
+ /**
+ * Initialize using caller-supplied SnowflakeClient and FileIO.
+ *
+ * @param name The name of the catalog, defaults to "snowflake_catalog"
+ * @param snowflakeClient The client encapsulating network communication
with Snowflake
+ * @param fileIOFactory The {@link FileIOFactory} to use to instantiate a
new FileIO for each new
+ * table operation
+ * @param properties The catalog options to use and propagate to dependencies
+ */
+ @SuppressWarnings("checkstyle:HiddenField")
+ void initialize(
+ String name,
+ SnowflakeClient snowflakeClient,
+ FileIOFactory fileIOFactory,
+ Map<String, String> properties) {
+ Preconditions.checkArgument(null != snowflakeClient, "snowflakeClient must
be non-null");
+ Preconditions.checkArgument(null != fileIOFactory, "fileIOFactory must be
non-null");
+ this.catalogName = name == null ? DEFAULT_CATALOG_NAME : name;
+ this.snowflakeClient = snowflakeClient;
+ this.fileIOFactory = fileIOFactory;
+ this.catalogProperties = properties;
+ this.closeableGroup = new CloseableGroup();
+ closeableGroup.addCloseable(snowflakeClient);
+ closeableGroup.setSuppressCloseFailure(true);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (null != closeableGroup) {
+ closeableGroup.close();
+ }
+ }
+
+ @Override
+ public void createNamespace(Namespace namespace, Map<String, String>
metadata) {
+ throw new UnsupportedOperationException(
+ "SnowflakeCatalog does not currently support createNamespace");
+ }
+
+ @Override
+ public List<Namespace> listNamespaces(Namespace namespace) {
+ SnowflakeIdentifier scope =
NamespaceHelpers.toSnowflakeIdentifier(namespace);
+ List<SnowflakeIdentifier> results = null;
+ switch (scope.type()) {
+ case ROOT:
+ results = snowflakeClient.listDatabases();
+ break;
+ case DATABASE:
+ results = snowflakeClient.listSchemas(scope);
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "listNamespaces must be at either ROOT or DATABASE level; got
%s from namespace %s",
+ scope, namespace));
+ }
+
+ return
results.stream().map(NamespaceHelpers::toIcebergNamespace).collect(Collectors.toList());
+ }
+
+ @Override
+ public Map<String, String> loadNamespaceMetadata(Namespace namespace)
+ throws NoSuchNamespaceException {
+ SnowflakeIdentifier id = NamespaceHelpers.toSnowflakeIdentifier(namespace);
+ boolean namespaceExists;
+ switch (id.type()) {
+ case DATABASE:
+ namespaceExists = snowflakeClient.databaseExists(id);
+ break;
+ case SCHEMA:
+ namespaceExists = snowflakeClient.schemaExists(id);
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "loadNamespaceMetadat must be at either DATABASE or SCHEMA
level; got %s from namespace %s",
+ id, namespace));
+ }
+ if (namespaceExists) {
+ return ImmutableMap.of();
+ } else {
+ throw new NoSuchNamespaceException(
+ "Namespace '%s' with snowflake identifier '%s' doesn't exist",
namespace, id);
+ }
+ }
+
+ @Override
+ public boolean dropNamespace(Namespace namespace) {
+ throw new UnsupportedOperationException(
+ "SnowflakeCatalog does not currently support dropNamespace");
+ }
+
+ @Override
+ public boolean setProperties(Namespace namespace, Map<String, String>
properties) {
+ throw new UnsupportedOperationException(
+ "SnowflakeCatalog does not currently support setProperties");
+ }
+
+ @Override
+ public boolean removeProperties(Namespace namespace, Set<String> properties)
{
+ throw new UnsupportedOperationException(
+ "SnowflakeCatalog does not currently support removeProperties");
+ }
+
+ @Override
+ protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+ String fileIOImpl = DEFAULT_FILE_IO_IMPL;
+ if (catalogProperties.containsKey(CatalogProperties.FILE_IO_IMPL)) {
+ fileIOImpl = catalogProperties.get(CatalogProperties.FILE_IO_IMPL);
+ }
+
+ // Initialize a fresh FileIO for each TableOperations created, because
some FileIO
+ // implementations such as S3FileIO can become bound to a single S3
bucket. Additionally,
+ // FileIO implementations often support only a finite set of one or more
URI schemes (i.e.
+ // S3FileIO only supports s3/s3a/s3n, and even ResolvingFileIO only
supports the combination
+ // of schemes registered for S3FileIO and HadoopFileIO). Individual
catalogs may need to
+ // support tables across different cloud/storage providers with disjoint
FileIO implementations.
+ FileIO fileIO = fileIOFactory.newFileIO(fileIOImpl, catalogProperties,
conf);
+ closeableGroup.addCloseable(fileIO);
+ return new SnowflakeTableOperations(snowflakeClient, fileIO, catalogName,
tableIdentifier);
+ }
+
+ @Override
+ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+ throw new UnsupportedOperationException(
+ "SnowflakeCatalog does not currently support
defaultWarehouseLocation");
+ }
+
+ @Override
+ public void setConf(Object conf) {
+ this.conf = conf;
+ }
+}
diff --git
a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeClient.java
b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeClient.java
new file mode 100644
index 0000000000..2dfadb9a65
--- /dev/null
+++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeClient.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.io.Closeable;
+import java.util.List;
+
+/**
+ * This interface abstracts out the underlying communication protocols for
contacting Snowflake to
+ * obtain the various resource representations defined under "entities".
Classes using this
+ * interface should minimize assumptions about whether an underlying client
uses e.g. REST, JDBC or
+ * other underlying libraries/protocols.
+ */
+interface SnowflakeClient extends Closeable {
+
+ /** Returns true if the database exists, false otherwise. */
+ boolean databaseExists(SnowflakeIdentifier database);
+
+ /** Returns true if the schema and its parent database exists, false
otherwise. */
+ boolean schemaExists(SnowflakeIdentifier schema);
+
+ /** Lists all Snowflake databases within the currently configured account. */
+ List<SnowflakeIdentifier> listDatabases();
+
+ /**
+ * Lists all Snowflake schemas within a given scope. Returned
SnowflakeIdentifiers must have
+ * type() == SnowflakeIdentifier.Type.SCHEMA.
+ *
+ * @param scope The scope in which to list, which may be ROOT or a single
DATABASE.
+ */
+ List<SnowflakeIdentifier> listSchemas(SnowflakeIdentifier scope);
+
+ /**
+ * Lists all Snowflake Iceberg tables within a given scope. Returned
SnowflakeIdentifiers must
+ * have type() == SnowflakeIdentifier.Type.TABLE.
+ *
+ * @param scope The scope in which to list, which may be ROOT, a DATABASE,
or a SCHEMA.
+ */
+ List<SnowflakeIdentifier> listIcebergTables(SnowflakeIdentifier scope);
+
+ /**
+ * Returns Snowflake-level metadata containing locations to more detailed
metadata.
+ *
+ * @param tableIdentifier The fully-qualified identifier that must be of type
+ * SnowflakeIdentifier.Type.TABLE.
+ */
+ SnowflakeTableMetadata loadTableMetadata(SnowflakeIdentifier
tableIdentifier);
+}
diff --git
a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeIdentifier.java
b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeIdentifier.java
new file mode 100644
index 0000000000..3082b1d8e5
--- /dev/null
+++
b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeIdentifier.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * Since the SnowflakeCatalog supports exactly two levels of Iceberg
Namespaces, corresponding
+ * directly to the "database" and "schema" portions of Snowflake's resource
model, this class
+ * represents a pre-validated and structured representation of a
fully-qualified Snowflake resource
+ * identifier. Snowflake-specific helper libraries should operate on this
representation instead of
+ * directly operating on TableIdentifiers or Namespaces wherever possible to
avoid duplication of
+ * parsing/validation logic for Iceberg TableIdentifier/Namespace levels.
+ */
+class SnowflakeIdentifier {
+ public enum Type {
+ ROOT,
+ DATABASE,
+ SCHEMA,
+ TABLE
+ }
+
+ private final String databaseName;
+ private final String schemaName;
+ private final String tableName;
+ private final Type type;
+
+ private SnowflakeIdentifier(String databaseName, String schemaName, String
tableName, Type type) {
+ this.databaseName = databaseName;
+ this.schemaName = schemaName;
+ this.tableName = tableName;
+ this.type = type;
+ }
+
+ public static SnowflakeIdentifier ofRoot() {
+ return new SnowflakeIdentifier(null, null, null, Type.ROOT);
+ }
+
+ public static SnowflakeIdentifier ofDatabase(String databaseName) {
+ Preconditions.checkArgument(null != databaseName, "databaseName must be
non-null");
+ return new SnowflakeIdentifier(databaseName, null, null, Type.DATABASE);
+ }
+
+ public static SnowflakeIdentifier ofSchema(String databaseName, String
schemaName) {
+ Preconditions.checkArgument(null != databaseName, "databaseName must be
non-null");
+ Preconditions.checkArgument(null != schemaName, "schemaName must be
non-null");
+ return new SnowflakeIdentifier(databaseName, schemaName, null,
Type.SCHEMA);
+ }
+
+ public static SnowflakeIdentifier ofTable(
+ String databaseName, String schemaName, String tableName) {
+ Preconditions.checkArgument(null != databaseName, "databaseName must be
non-null");
+ Preconditions.checkArgument(null != schemaName, "schemaName must be
non-null");
+ Preconditions.checkArgument(null != tableName, "tableName must be
non-null");
+ return new SnowflakeIdentifier(databaseName, schemaName, tableName,
Type.TABLE);
+ }
+
+ /**
+ * If type is TABLE, expect non-null databaseName, schemaName, and
tableName. If type is SCHEMA,
+ * expect non-null databaseName and schemaName. If type is DATABASE, expect
non-null databaseName.
+ * If type is ROOT, expect all of databaseName, schemaName, and tableName to
be null.
+ */
+ public Type type() {
+ return type;
+ }
+
+ public String tableName() {
+ return tableName;
+ }
+
+ public String databaseName() {
+ return databaseName;
+ }
+
+ public String schemaName() {
+ return schemaName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (!(o instanceof SnowflakeIdentifier)) {
+ return false;
+ }
+
+ SnowflakeIdentifier that = (SnowflakeIdentifier) o;
+ return Objects.equal(this.databaseName, that.databaseName)
+ && Objects.equal(this.schemaName, that.schemaName)
+ && Objects.equal(this.tableName, that.tableName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(databaseName, schemaName, tableName);
+ }
+
+ /** Returns this identifier as a String suitable for use in a Snowflake
IDENTIFIER param. */
+ public String toIdentifierString() {
+ switch (type()) {
+ case TABLE:
+ return String.format("%s.%s.%s", databaseName, schemaName, tableName);
+ case SCHEMA:
+ return String.format("%s.%s", databaseName, schemaName);
+ case DATABASE:
+ return databaseName;
+ default:
+ return "";
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s: '%s'", type(), toIdentifierString());
+ }
+}
diff --git
a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableMetadata.java
b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableMetadata.java
new file mode 100644
index 0000000000..c550b3e13a
--- /dev/null
+++
b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableMetadata.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+class SnowflakeTableMetadata {
+ public static final Pattern SNOWFLAKE_AZURE_PATTERN =
+ Pattern.compile("azure://([^/]+)/([^/]+)/(.*)");
+
+ private final String snowflakeMetadataLocation;
+ private final String icebergMetadataLocation;
+ private final String status;
+
+ // Note: Since not all sources will necessarily come from a raw JSON
representation, this raw
+ // JSON should only be considered a convenient debugging field. Equality of
two
+ // SnowflakeTableMetadata instances should not depend on equality of this
field.
+ private final String rawJsonVal;
+
+ SnowflakeTableMetadata(
+ String snowflakeMetadataLocation,
+ String icebergMetadataLocation,
+ String status,
+ String rawJsonVal) {
+ this.snowflakeMetadataLocation = snowflakeMetadataLocation;
+ this.icebergMetadataLocation = icebergMetadataLocation;
+ this.status = status;
+ this.rawJsonVal = rawJsonVal;
+ }
+
+ /** Storage location of table metadata in Snowflake's path syntax. */
+ public String snowflakeMetadataLocation() {
+ return snowflakeMetadataLocation;
+ }
+
+ /** Storage location of table metadata in Iceberg's path syntax. */
+ public String icebergMetadataLocation() {
+ return icebergMetadataLocation;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (!(o instanceof SnowflakeTableMetadata)) {
+ return false;
+ }
+
+ // Only consider parsed fields, not the raw JSON that may or may not be
the original source of
+ // this instance.
+ SnowflakeTableMetadata that = (SnowflakeTableMetadata) o;
+ return Objects.equal(this.snowflakeMetadataLocation,
that.snowflakeMetadataLocation)
+ && Objects.equal(this.icebergMetadataLocation,
that.icebergMetadataLocation)
+ && Objects.equal(this.status, that.status);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(snowflakeMetadataLocation,
icebergMetadataLocation, status);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "snowflakeMetadataLocation: '%s', icebergMetadataLocation: '%s',
status: '%s'",
+ snowflakeMetadataLocation, icebergMetadataLocation, status);
+ }
+
+ public String toDebugString() {
+ return String.format("%s, rawJsonVal: %s", toString(), rawJsonVal);
+ }
+
+ /**
+ * Translates from Snowflake's path syntax to Iceberg's path syntax for
paths matching known
+ * non-compatible Snowflake paths. Throws IllegalArgumentException if the
prefix of the
+ * snowflakeLocation is a known non-compatible path syntax but fails to
match the expected path
+ * components for a successful translation.
+ */
+ public static String snowflakeLocationToIcebergLocation(String
snowflakeLocation) {
+ if (snowflakeLocation.startsWith("azure://")) {
+ // Convert from expected path of the form:
+ // azure://account.blob.core.windows.net/container/volumepath
+ // to:
+ // wasbs://[email protected]/volumepath
+ Matcher matcher = SNOWFLAKE_AZURE_PATTERN.matcher(snowflakeLocation);
+ Preconditions.checkArgument(
+ matcher.matches(),
+ "Location '%s' failed to match pattern '%s'",
+ snowflakeLocation,
+ SNOWFLAKE_AZURE_PATTERN);
+ return String.format(
+ "wasbs://%s@%s/%s", matcher.group(2), matcher.group(1),
matcher.group(3));
+ } else if (snowflakeLocation.startsWith("gcs://")) {
+ // Convert from expected path of the form:
+ // gcs://bucket/path
+ // to:
+ // gs://bucket/path
+ return "gs" + snowflakeLocation.substring(3);
+ }
+
+ return snowflakeLocation;
+ }
+
+ /**
+ * Factory method for parsing a JSON string containing expected Snowflake
table metadata into a
+ * SnowflakeTableMetadata object.
+ */
+ public static SnowflakeTableMetadata parseJson(String json) {
+ JsonNode parsedVal;
+ try {
+ parsedVal = JsonUtil.mapper().readValue(json, JsonNode.class);
+ } catch (IOException ioe) {
+ throw new IllegalArgumentException(String.format("Malformed JSON: %s",
json), ioe);
+ }
+
+ String snowflakeMetadataLocation = JsonUtil.getString("metadataLocation",
parsedVal);
+ String status = JsonUtil.getStringOrNull("status", parsedVal);
+
+ String icebergMetadataLocation =
snowflakeLocationToIcebergLocation(snowflakeMetadataLocation);
+
+ return new SnowflakeTableMetadata(
+ snowflakeMetadataLocation, icebergMetadataLocation, status, json);
+ }
+}
diff --git
a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableOperations.java
b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableOperations.java
new file mode 100644
index 0000000000..1fe90d7eff
--- /dev/null
+++
b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableOperations.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SnowflakeTableOperations extends BaseMetastoreTableOperations {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SnowflakeTableOperations.class);
+
+ private final FileIO fileIO;
+ private final TableIdentifier tableIdentifier;
+ private final SnowflakeIdentifier snowflakeIdentifierForTable;
+ private final String fullTableName;
+
+ private final SnowflakeClient snowflakeClient;
+
+ protected SnowflakeTableOperations(
+ SnowflakeClient snowflakeClient,
+ FileIO fileIO,
+ String catalogName,
+ TableIdentifier tableIdentifier) {
+ this.snowflakeClient = snowflakeClient;
+ this.fileIO = fileIO;
+ this.tableIdentifier = tableIdentifier;
+ this.snowflakeIdentifierForTable =
NamespaceHelpers.toSnowflakeIdentifier(tableIdentifier);
+ this.fullTableName = String.format("%s.%s", catalogName, tableIdentifier);
+ }
+
+ @Override
+ public void doRefresh() {
+ LOG.debug("Getting metadata location for table {}", tableIdentifier);
+ String location = loadTableMetadataLocation();
+ Preconditions.checkState(
+ location != null && !location.isEmpty(),
+ "Got null or empty location %s for table %s",
+ location,
+ tableIdentifier);
+ refreshFromMetadataLocation(location);
+ }
+
+ @Override
+ public FileIO io() {
+ return fileIO;
+ }
+
+ @Override
+ protected String tableName() {
+ return fullTableName;
+ }
+
+ @VisibleForTesting
+ String fullTableName() {
+ return tableName();
+ }
+
+ private String loadTableMetadataLocation() {
+ SnowflakeTableMetadata metadata =
+ snowflakeClient.loadTableMetadata(snowflakeIdentifierForTable);
+
+ if (metadata == null) {
+ throw new NoSuchTableException("Cannot find table %s",
snowflakeIdentifierForTable);
+ }
+
+ if (!metadata.getStatus().equals("success")) {
+ LOG.warn(
+ "Got non-successful table metadata: {} with metadataLocation {} for
table {}",
+ metadata.getStatus(),
+ metadata.icebergMetadataLocation(),
+ snowflakeIdentifierForTable);
+ }
+
+ return metadata.icebergMetadataLocation();
+ }
+}
diff --git
a/snowflake/src/test/java/org/apache/iceberg/snowflake/FakeSnowflakeClient.java
b/snowflake/src/test/java/org/apache/iceberg/snowflake/FakeSnowflakeClient.java
new file mode 100644
index 0000000000..834dc3c6c4
--- /dev/null
+++
b/snowflake/src/test/java/org/apache/iceberg/snowflake/FakeSnowflakeClient.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.jdbc.UncheckedSQLException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class FakeSnowflakeClient implements SnowflakeClient {
+ // In-memory lookup by database/schema/tableName to table metadata.
+ private final Map<String, Map<String, Map<String, SnowflakeTableMetadata>>>
databases =
+ Maps.newTreeMap();
+ private boolean closed = false;
+
+ public FakeSnowflakeClient() {}
+
+ /**
+ * Also adds parent database/schema if they don't already exist. If the
tableName already exists
+ * under the given database/schema, the value is replaced with the provided
metadata.
+ */
+ public void addTable(SnowflakeIdentifier tableIdentifier,
SnowflakeTableMetadata metadata) {
+ Preconditions.checkState(!closed, "Cannot call addTable after calling
close()");
+ if (!databases.containsKey(tableIdentifier.databaseName())) {
+ databases.put(tableIdentifier.databaseName(), Maps.newTreeMap());
+ }
+ Map<String, Map<String, SnowflakeTableMetadata>> schemas =
+ databases.get(tableIdentifier.databaseName());
+ if (!schemas.containsKey(tableIdentifier.schemaName())) {
+ schemas.put(tableIdentifier.schemaName(), Maps.newTreeMap());
+ }
+ Map<String, SnowflakeTableMetadata> tables =
schemas.get(tableIdentifier.schemaName());
+ tables.put(tableIdentifier.tableName(), metadata);
+ }
+
+ @Override
+ public boolean databaseExists(SnowflakeIdentifier database) {
+ return databases.containsKey(database.databaseName());
+ }
+
+ @Override
+ public boolean schemaExists(SnowflakeIdentifier schema) {
+ return databases.containsKey(schema.databaseName())
+ &&
databases.get(schema.databaseName()).containsKey(schema.schemaName());
+ }
+
+ @Override
+ public List<SnowflakeIdentifier> listDatabases() {
+ Preconditions.checkState(!closed, "Cannot call listDatabases after calling
close()");
+ List<SnowflakeIdentifier> databaseIdentifiers = Lists.newArrayList();
+ for (String databaseName : databases.keySet()) {
+ databaseIdentifiers.add(SnowflakeIdentifier.ofDatabase(databaseName));
+ }
+ return databaseIdentifiers;
+ }
+
+ @Override
+ public List<SnowflakeIdentifier> listSchemas(SnowflakeIdentifier scope) {
+ Preconditions.checkState(!closed, "Cannot call listSchemas after calling
close()");
+ List<SnowflakeIdentifier> schemas = Lists.newArrayList();
+ switch (scope.type()) {
+ case ROOT:
+ // "account-level" listing.
+ for (Map.Entry<String, Map<String, Map<String,
SnowflakeTableMetadata>>> db :
+ databases.entrySet()) {
+ for (String schema : db.getValue().keySet()) {
+ schemas.add(SnowflakeIdentifier.ofSchema(db.getKey(), schema));
+ }
+ }
+ break;
+ case DATABASE:
+ String dbName = scope.databaseName();
+ if (databases.containsKey(dbName)) {
+ for (String schema : databases.get(dbName).keySet()) {
+ schemas.add(SnowflakeIdentifier.ofSchema(dbName, schema));
+ }
+ } else {
+ throw new UncheckedSQLException("Object does not exist: database:
'%s'", dbName);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported scope type for listSchemas: '%s'",
scope));
+ }
+ return schemas;
+ }
+
+ @Override
+ public List<SnowflakeIdentifier> listIcebergTables(SnowflakeIdentifier
scope) {
+ Preconditions.checkState(!closed, "Cannot call listIcebergTables after
calling close()");
+ List<SnowflakeIdentifier> tables = Lists.newArrayList();
+ switch (scope.type()) {
+ case ROOT:
+ {
+ // "account-level" listing.
+ for (Map.Entry<String, Map<String, Map<String,
SnowflakeTableMetadata>>> db :
+ databases.entrySet()) {
+ for (Map.Entry<String, Map<String, SnowflakeTableMetadata>> schema
:
+ db.getValue().entrySet()) {
+ for (String tableName : schema.getValue().keySet()) {
+ tables.add(SnowflakeIdentifier.ofTable(db.getKey(),
schema.getKey(), tableName));
+ }
+ }
+ }
+ break;
+ }
+ case DATABASE:
+ {
+ String dbName = scope.databaseName();
+ if (databases.containsKey(dbName)) {
+ for (Map.Entry<String, Map<String, SnowflakeTableMetadata>> schema
:
+ databases.get(dbName).entrySet()) {
+ for (String tableName : schema.getValue().keySet()) {
+ tables.add(SnowflakeIdentifier.ofTable(dbName,
schema.getKey(), tableName));
+ }
+ }
+ } else {
+ throw new UncheckedSQLException("Object does not exist: database:
'%s'", dbName);
+ }
+ break;
+ }
+ case SCHEMA:
+ {
+ String dbName = scope.databaseName();
+ if (databases.containsKey(dbName)) {
+ String schemaName = scope.schemaName();
+ if (databases.get(dbName).containsKey(schemaName)) {
+ for (String tableName :
databases.get(dbName).get(schemaName).keySet()) {
+ tables.add(SnowflakeIdentifier.ofTable(dbName, schemaName,
tableName));
+ }
+ } else {
+ throw new UncheckedSQLException(
+ "Object does not exist: database.schema: '%s.%s'", dbName,
schemaName);
+ }
+ } else {
+ throw new UncheckedSQLException("Object does not exist: database:
'%s'", dbName);
+ }
+ break;
+ }
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported scope type for listing tables: %s",
scope));
+ }
+ return tables;
+ }
+
+ @Override
+ public SnowflakeTableMetadata loadTableMetadata(SnowflakeIdentifier
tableIdentifier) {
+ Preconditions.checkState(!closed, "Cannot call getTableMetadata after
calling close()");
+
+ Preconditions.checkArgument(
+ tableIdentifier.type() == SnowflakeIdentifier.Type.TABLE,
+ "tableIdentifier must be type TABLE, get: %s",
+ tableIdentifier);
+ String dbName = tableIdentifier.databaseName();
+ String schemaName = tableIdentifier.schemaName();
+ if (!databases.containsKey(dbName)
+ || !databases.get(dbName).containsKey(schemaName)
+ ||
!databases.get(dbName).get(schemaName).containsKey(tableIdentifier.tableName()))
{
+ throw new UncheckedSQLException("Object does not exist: object: '%s'",
tableIdentifier);
+ }
+ return
databases.get(dbName).get(schemaName).get(tableIdentifier.tableName());
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ }
+}
diff --git
a/snowflake/src/test/java/org/apache/iceberg/snowflake/JdbcSnowflakeClientTest.java
b/snowflake/src/test/java/org/apache/iceberg/snowflake/JdbcSnowflakeClientTest.java
new file mode 100644
index 0000000000..1374ad8ac2
--- /dev/null
+++
b/snowflake/src/test/java/org/apache/iceberg/snowflake/JdbcSnowflakeClientTest.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.jdbc.JdbcClientPool;
+import org.apache.iceberg.jdbc.UncheckedInterruptedException;
+import org.apache.iceberg.jdbc.UncheckedSQLException;
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class JdbcSnowflakeClientTest {
+ @Mock private Connection mockConnection;
+ @Mock private JdbcClientPool mockClientPool;
+ @Mock private JdbcSnowflakeClient.QueryHarness mockQueryHarness;
+ @Mock private ResultSet mockResultSet;
+
+ private JdbcSnowflakeClient snowflakeClient;
+
+ @Before
+ public void before() throws SQLException, InterruptedException {
+ snowflakeClient = new JdbcSnowflakeClient(mockClientPool);
+ snowflakeClient.setQueryHarness(mockQueryHarness);
+
+ doAnswer(invocation -> ((ClientPool.Action)
invocation.getArguments()[0]).run(mockConnection))
+ .when(mockClientPool)
+ .run(any(ClientPool.Action.class));
+ doAnswer(
+ invocation ->
+ ((JdbcSnowflakeClient.ResultSetParser)
invocation.getArguments()[2])
+ .parse(mockResultSet))
+ .when(mockQueryHarness)
+ .query(
+ any(Connection.class),
+ any(String.class),
+ any(JdbcSnowflakeClient.ResultSetParser.class),
+ ArgumentMatchers.<String>any());
+ }
+
+ @Test
+ public void testNullClientPoolInConstructor() {
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> new JdbcSnowflakeClient(null))
+ .withMessageContaining("JdbcClientPool must be non-null");
+ }
+
+ @Test
+ public void testDatabaseExists() throws SQLException {
+ when(mockResultSet.next()).thenReturn(true).thenReturn(false);
+ when(mockResultSet.getString("name")).thenReturn("DB_1");
+
+
Assertions.assertThat(snowflakeClient.databaseExists(SnowflakeIdentifier.ofDatabase("DB_1")))
+ .isTrue();
+
+ verify(mockQueryHarness)
+ .query(
+ eq(mockConnection),
+ eq("SHOW DATABASES LIKE 'DB_1' IN ACCOUNT"),
+ any(JdbcSnowflakeClient.ResultSetParser.class));
+ }
+
+ @Test
+ public void testDatabaseExistsSpecialCharacters() throws SQLException {
+ when(mockResultSet.next()).thenReturn(true).thenReturn(false);
+ when(mockResultSet.getString("name")).thenReturn("$DB_1$.'!@#%^&*");
+
+ Assertions.assertThat(
+
snowflakeClient.databaseExists(SnowflakeIdentifier.ofDatabase("$DB_1$.'!@#%^&*")))
+ .isTrue();
+
+ verify(mockQueryHarness)
+ .query(
+ eq(mockConnection),
+ eq("SHOW DATABASES LIKE '_DB_1$_________' IN ACCOUNT"),
+ any(JdbcSnowflakeClient.ResultSetParser.class));
+ }
+
+ @Test
+ public void testDatabaseDoesntExistNoResults() throws SQLException {
+ when(mockResultSet.next()).thenReturn(false);
+
+
Assertions.assertThat(snowflakeClient.databaseExists(SnowflakeIdentifier.ofDatabase("DB_1")))
+ .isFalse();
+ }
+
+ @Test
+ public void testDatabaseDoesntExistMismatchedResults() throws SQLException {
+ when(mockResultSet.next()).thenReturn(true).thenReturn(false);
+ when(mockResultSet.getString("name")).thenReturn("DBZ1");
+
+
Assertions.assertThat(snowflakeClient.databaseExists(SnowflakeIdentifier.ofDatabase("DB_1")))
+ .isFalse();
+ }
+
+ @Test
+ public void testSchemaExists() throws SQLException {
+ when(mockResultSet.next())
+ .thenReturn(true)
+ .thenReturn(false)
+ .thenReturn(true)
+ .thenReturn(false);
+
when(mockResultSet.getString("name")).thenReturn("DB_1").thenReturn("SCHEMA_1");
+ when(mockResultSet.getString("database_name")).thenReturn("DB_1");
+
+ Assertions.assertThat(
+ snowflakeClient.schemaExists(SnowflakeIdentifier.ofSchema("DB_1",
"SCHEMA_1")))
+ .isTrue();
+
+ verify(mockQueryHarness)
+ .query(
+ eq(mockConnection),
+ eq("SHOW DATABASES LIKE 'DB_1' IN ACCOUNT"),
+ any(JdbcSnowflakeClient.ResultSetParser.class));
+ verify(mockQueryHarness)
+ .query(
+ eq(mockConnection),
+ eq("SHOW SCHEMAS LIKE 'SCHEMA_1' IN DATABASE IDENTIFIER(?)"),
+ any(JdbcSnowflakeClient.ResultSetParser.class),
+ eq("DB_1"));
+ }
+
+ @Test
+ public void testSchemaExistsSpecialCharacters() throws SQLException {
+ when(mockResultSet.next())
+ .thenReturn(true)
+ .thenReturn(false)
+ .thenReturn(true)
+ .thenReturn(false);
+
when(mockResultSet.getString("name")).thenReturn("DB_1").thenReturn("$SCHEMA_1$.'!@#%^&*");
+ when(mockResultSet.getString("database_name")).thenReturn("DB_1");
+
+ Assertions.assertThat(
+ snowflakeClient.schemaExists(
+ SnowflakeIdentifier.ofSchema("DB_1", "$SCHEMA_1$.'!@#%^&*")))
+ .isTrue();
+
+ verify(mockQueryHarness)
+ .query(
+ eq(mockConnection),
+ eq("SHOW DATABASES LIKE 'DB_1' IN ACCOUNT"),
+ any(JdbcSnowflakeClient.ResultSetParser.class));
+ verify(mockQueryHarness)
+ .query(
+ eq(mockConnection),
+ eq("SHOW SCHEMAS LIKE '_SCHEMA_1$_________' IN DATABASE
IDENTIFIER(?)"),
+ any(JdbcSnowflakeClient.ResultSetParser.class),
+ eq("DB_1"));
+ }
+
+ @Test
+ public void testSchemaDoesntExistMismatchDatabase() throws SQLException {
+ when(mockResultSet.next()).thenReturn(true).thenReturn(false);
+ when(mockResultSet.getString("name")).thenReturn("DBZ1");
+
+ Assertions.assertThat(
+ snowflakeClient.schemaExists(SnowflakeIdentifier.ofSchema("DB_1",
"SCHEMA_1")))
+ .isFalse();
+ }
+
+ @Test
+ public void testSchemaDoesntExistNoSchemaFound() throws SQLException {
+
when(mockResultSet.next()).thenReturn(true).thenReturn(false).thenReturn(false);
+ when(mockResultSet.getString("name")).thenReturn("DB_1");
+
+ Assertions.assertThat(
+ snowflakeClient.schemaExists(SnowflakeIdentifier.ofSchema("DB_1",
"SCHEMA_1")))
+ .isFalse();
+ }
+
+ @Test
+ public void testSchemaDoesntExistSchemaMismatch() throws SQLException {
+ when(mockResultSet.next())
+ .thenReturn(true)
+ .thenReturn(false)
+ .thenReturn(true)
+ .thenReturn(false);
+
when(mockResultSet.getString("name")).thenReturn("DB_1").thenReturn("SCHEMAZ1");
+ when(mockResultSet.getString("database_name")).thenReturn("DB_1");
+
+ Assertions.assertThat(
+ snowflakeClient.schemaExists(SnowflakeIdentifier.ofSchema("DB_1",
"SCHEMA_1")))
+ .isFalse();
+ }
+
+ @Test
+ public void testListDatabasesInAccount() throws SQLException {
+
when(mockResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false);
+
when(mockResultSet.getString("name")).thenReturn("DB_1").thenReturn("DB_2").thenReturn("DB_3");
+
+ List<SnowflakeIdentifier> actualList = snowflakeClient.listDatabases();
+
+ verify(mockQueryHarness)
+ .query(
+ eq(mockConnection),
+ eq("SHOW DATABASES IN ACCOUNT"),
+ any(JdbcSnowflakeClient.ResultSetParser.class));
+
+ Assertions.assertThat(actualList)
+ .containsExactly(
+ SnowflakeIdentifier.ofDatabase("DB_1"),
+ SnowflakeIdentifier.ofDatabase("DB_2"),
+ SnowflakeIdentifier.ofDatabase("DB_3"));
+ }
+
+ /**
+ * For the root scope, expect an underlying query to list schemas at the
ACCOUNT level with no
+ * query parameters.
+ */
+ @Test
+ public void testListSchemasInAccount() throws SQLException {
+
when(mockResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false);
+ when(mockResultSet.getString("database_name"))
+ .thenReturn("DB_1")
+ .thenReturn("DB_1")
+ .thenReturn("DB_2");
+ when(mockResultSet.getString("name"))
+ .thenReturn("SCHEMA_1")
+ .thenReturn("SCHEMA_2")
+ .thenReturn("SCHEMA_3");
+
+ List<SnowflakeIdentifier> actualList =
+ snowflakeClient.listSchemas(SnowflakeIdentifier.ofRoot());
+
+ verify(mockQueryHarness)
+ .query(
+ eq(mockConnection),
+ eq("SHOW SCHEMAS IN ACCOUNT"),
+ any(JdbcSnowflakeClient.ResultSetParser.class),
+ eq(null));
+
+ Assertions.assertThat(actualList)
+ .containsExactly(
+ SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_1"),
+ SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_2"),
+ SnowflakeIdentifier.ofSchema("DB_2", "SCHEMA_3"));
+ }
+
+ /**
+ * For a DATABASE scope, expect an underlying query to list schemas at the
DATABASE level and
+ * supply the database as a query param in an IDENTIFIER.
+ */
+ @Test
+ public void testListSchemasInDatabase() throws SQLException {
+
when(mockResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+
when(mockResultSet.getString("database_name")).thenReturn("DB_1").thenReturn("DB_1");
+
when(mockResultSet.getString("name")).thenReturn("SCHEMA_1").thenReturn("SCHEMA_2");
+
+ List<SnowflakeIdentifier> actualList =
+ snowflakeClient.listSchemas(SnowflakeIdentifier.ofDatabase("DB_1"));
+
+ verify(mockQueryHarness)
+ .query(
+ eq(mockConnection),
+ eq("SHOW SCHEMAS IN DATABASE IDENTIFIER(?)"),
+ any(JdbcSnowflakeClient.ResultSetParser.class),
+ eq("DB_1"));
+
+ Assertions.assertThat(actualList)
+ .containsExactly(
+ SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_1"),
+ SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_2"));
+ }
+
+ /**
+ * Any unexpected SQLException from the underlying connection will propagate
out as an
+ * UncheckedSQLException when listing schemas.
+ */
+ @Test
+ public void testListSchemasSQLException() throws SQLException,
InterruptedException {
+ when(mockClientPool.run(any(ClientPool.Action.class)))
+ .thenThrow(new SQLException("Fake SQL exception"));
+ Assertions.assertThatExceptionOfType(UncheckedSQLException.class)
+ .isThrownBy(() ->
snowflakeClient.listSchemas(SnowflakeIdentifier.ofDatabase("DB_1")))
+ .withStackTraceContaining("Fake SQL exception");
+ }
+
+ /**
+ * Any unexpected InterruptedException from the underlying connection will
propagate out as an
+ * UncheckedInterruptedException when listing schemas.
+ */
+ @Test
+ public void testListSchemasInterruptedException() throws SQLException,
InterruptedException {
+ when(mockClientPool.run(any(ClientPool.Action.class)))
+ .thenThrow(new InterruptedException("Fake interrupted exception"));
+ Assertions.assertThatExceptionOfType(UncheckedInterruptedException.class)
+ .isThrownBy(() ->
snowflakeClient.listSchemas(SnowflakeIdentifier.ofDatabase("DB_1")))
+ .withStackTraceContaining("Fake interrupted exception");
+ }
+
+ /**
+ * For the root/empty scope, expect an underlying query to list tables at
the ACCOUNT level with
+ * no query parameters.
+ */
+ @Test
+ public void testListIcebergTablesInAccount() throws SQLException {
+ when(mockResultSet.next())
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(false);
+ when(mockResultSet.getString("database_name"))
+ .thenReturn("DB_1")
+ .thenReturn("DB_1")
+ .thenReturn("DB_1")
+ .thenReturn("DB_2");
+ when(mockResultSet.getString("schema_name"))
+ .thenReturn("SCHEMA_1")
+ .thenReturn("SCHEMA_1")
+ .thenReturn("SCHEMA_2")
+ .thenReturn("SCHEMA_3");
+ when(mockResultSet.getString("name"))
+ .thenReturn("TABLE_1")
+ .thenReturn("TABLE_2")
+ .thenReturn("TABLE_3")
+ .thenReturn("TABLE_4");
+
+ List<SnowflakeIdentifier> actualList =
+ snowflakeClient.listIcebergTables(SnowflakeIdentifier.ofRoot());
+
+ verify(mockQueryHarness)
+ .query(
+ eq(mockConnection),
+ eq("SHOW ICEBERG TABLES IN ACCOUNT"),
+ any(JdbcSnowflakeClient.ResultSetParser.class),
+ eq(null));
+
+ Assertions.assertThat(actualList)
+ .containsExactly(
+ SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1"),
+ SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_2"),
+ SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_2", "TABLE_3"),
+ SnowflakeIdentifier.ofTable("DB_2", "SCHEMA_3", "TABLE_4"));
+ }
+
+ /**
+ * For a DATABASE scope, expect an underlying query to list tables at the
DATABASE level and
+ * supply the database as a query param in an IDENTIFIER.
+ */
+ @Test
+ public void testListIcebergTablesInDatabase() throws SQLException {
+
when(mockResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false);
+ when(mockResultSet.getString("database_name"))
+ .thenReturn("DB_1")
+ .thenReturn("DB_1")
+ .thenReturn("DB_1");
+ when(mockResultSet.getString("schema_name"))
+ .thenReturn("SCHEMA_1")
+ .thenReturn("SCHEMA_1")
+ .thenReturn("SCHEMA_2");
+ when(mockResultSet.getString("name"))
+ .thenReturn("TABLE_1")
+ .thenReturn("TABLE_2")
+ .thenReturn("TABLE_3");
+
+ List<SnowflakeIdentifier> actualList =
+
snowflakeClient.listIcebergTables(SnowflakeIdentifier.ofDatabase("DB_1"));
+
+ verify(mockQueryHarness)
+ .query(
+ eq(mockConnection),
+ eq("SHOW ICEBERG TABLES IN DATABASE IDENTIFIER(?)"),
+ any(JdbcSnowflakeClient.ResultSetParser.class),
+ eq("DB_1"));
+
+ Assertions.assertThat(actualList)
+ .containsExactly(
+ SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1"),
+ SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_2"),
+ SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_2", "TABLE_3"));
+ }
+
+ /**
+ * For a SCHEMA scope, expect an underlying query to list tables at the
SCHEMA level and supply
+ * the schema as a query param in an IDENTIFIER.
+ */
+ @Test
+ public void testListIcebergTablesInSchema() throws SQLException {
+
when(mockResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+
when(mockResultSet.getString("database_name")).thenReturn("DB_1").thenReturn("DB_1");
+
when(mockResultSet.getString("schema_name")).thenReturn("SCHEMA_1").thenReturn("SCHEMA_1");
+
when(mockResultSet.getString("name")).thenReturn("TABLE_1").thenReturn("TABLE_2");
+
+ List<SnowflakeIdentifier> actualList =
+ snowflakeClient.listIcebergTables(SnowflakeIdentifier.ofSchema("DB_1",
"SCHEMA_1"));
+
+ verify(mockQueryHarness)
+ .query(
+ eq(mockConnection),
+ eq("SHOW ICEBERG TABLES IN SCHEMA IDENTIFIER(?)"),
+ any(JdbcSnowflakeClient.ResultSetParser.class),
+ eq("DB_1.SCHEMA_1"));
+
+ Assertions.assertThat(actualList)
+ .containsExactly(
+ SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1"),
+ SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_2"));
+ }
+
+ /**
+ * Any unexpected SQLException from the underlying connection will propagate
out as an
+ * UncheckedSQLException when listing tables.
+ */
+ @Test
+ public void testListIcebergTablesSQLException() throws SQLException,
InterruptedException {
+ when(mockClientPool.run(any(ClientPool.Action.class)))
+ .thenThrow(new SQLException("Fake SQL exception"));
+ Assertions.assertThatExceptionOfType(UncheckedSQLException.class)
+ .isThrownBy(() ->
snowflakeClient.listIcebergTables(SnowflakeIdentifier.ofDatabase("DB_1")))
+ .withStackTraceContaining("Fake SQL exception");
+ }
+
+ /**
+ * Any unexpected InterruptedException from the underlying connection will
propagate out as an
+ * UncheckedInterruptedException when listing tables.
+ */
+ @Test
+ public void testListIcebergTablesInterruptedException()
+ throws SQLException, InterruptedException {
+ when(mockClientPool.run(any(ClientPool.Action.class)))
+ .thenThrow(new InterruptedException("Fake interrupted exception"));
+ Assertions.assertThatExceptionOfType(UncheckedInterruptedException.class)
+ .isThrownBy(() ->
snowflakeClient.listIcebergTables(SnowflakeIdentifier.ofDatabase("DB_1")))
+ .withStackTraceContaining("Fake interrupted exception");
+ }
+
+ /**
+ * Test parsing of table metadata JSON from a GET_ICEBERG_TABLE_INFORMATION
call, with the S3 path
+ * unaltered between snowflake/iceberg path representations.
+ */
+ @Test
+ public void testGetS3TableMetadata() throws SQLException {
+ when(mockResultSet.next()).thenReturn(true);
+ when(mockResultSet.getString("METADATA"))
+ .thenReturn(
+
"{\"metadataLocation\":\"s3://tab1/metadata/v3.metadata.json\",\"status\":\"success\"}");
+
+ SnowflakeTableMetadata actualMetadata =
+ snowflakeClient.loadTableMetadata(
+ SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1"));
+
+ verify(mockQueryHarness)
+ .query(
+ eq(mockConnection),
+ eq("SELECT SYSTEM$GET_ICEBERG_TABLE_INFORMATION(?) AS METADATA"),
+ any(JdbcSnowflakeClient.ResultSetParser.class),
+ eq("DB_1.SCHEMA_1.TABLE_1"));
+
+ SnowflakeTableMetadata expectedMetadata =
+ new SnowflakeTableMetadata(
+ "s3://tab1/metadata/v3.metadata.json",
+ "s3://tab1/metadata/v3.metadata.json",
+ "success",
+ null);
+ Assertions.assertThat(actualMetadata).isEqualTo(expectedMetadata);
+ }
+
+ /**
+ * Test parsing of table metadata JSON from a GET_ICEBERG_TABLE_INFORMATION
call, with the Azure
+ * path translated from an azure:// format to a wasbs:// format.
+ */
+ @Test
+ public void testGetAzureTableMetadata() throws SQLException {
+ when(mockResultSet.next()).thenReturn(true);
+ when(mockResultSet.getString("METADATA"))
+ .thenReturn(
+
"{\"metadataLocation\":\"azure://myaccount.blob.core.windows.net/mycontainer/tab3/metadata/v334.metadata.json\",\"status\":\"success\"}");
+
+ SnowflakeTableMetadata actualMetadata =
+ snowflakeClient.loadTableMetadata(
+ SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1"));
+
+ verify(mockQueryHarness)
+ .query(
+ eq(mockConnection),
+ eq("SELECT SYSTEM$GET_ICEBERG_TABLE_INFORMATION(?) AS METADATA"),
+ any(JdbcSnowflakeClient.ResultSetParser.class),
+ eq("DB_1.SCHEMA_1.TABLE_1"));
+
+ SnowflakeTableMetadata expectedMetadata =
+ new SnowflakeTableMetadata(
+
"azure://myaccount.blob.core.windows.net/mycontainer/tab3/metadata/v334.metadata.json",
+
"wasbs://[email protected]/tab3/metadata/v334.metadata.json",
+ "success",
+ null);
+ Assertions.assertThat(actualMetadata).isEqualTo(expectedMetadata);
+ }
+
+ /**
+ * Test parsing of table metadata JSON from a GET_ICEBERG_TABLE_INFORMATION
call, with the GCS
+ * path translated from a gcs:// format to a gs:// format.
+ */
+ @Test
+ public void testGetGcsTableMetadata() throws SQLException {
+ when(mockResultSet.next()).thenReturn(true);
+ when(mockResultSet.getString("METADATA"))
+ .thenReturn(
+
"{\"metadataLocation\":\"gcs://tab5/metadata/v793.metadata.json\",\"status\":\"success\"}");
+
+ SnowflakeTableMetadata actualMetadata =
+ snowflakeClient.loadTableMetadata(
+ SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1"));
+
+ verify(mockQueryHarness)
+ .query(
+ eq(mockConnection),
+ eq("SELECT SYSTEM$GET_ICEBERG_TABLE_INFORMATION(?) AS METADATA"),
+ any(JdbcSnowflakeClient.ResultSetParser.class),
+ eq("DB_1.SCHEMA_1.TABLE_1"));
+
+ SnowflakeTableMetadata expectedMetadata =
+ new SnowflakeTableMetadata(
+ "gcs://tab5/metadata/v793.metadata.json",
+ "gs://tab5/metadata/v793.metadata.json",
+ "success",
+ null);
+ Assertions.assertThat(actualMetadata).isEqualTo(expectedMetadata);
+ }
+
+ /** Malformed JSON from a ResultSet should propagate as an
IllegalArgumentException. */
+ @Test
+ public void testGetTableMetadataMalformedJson() throws SQLException {
+ when(mockResultSet.next()).thenReturn(true);
+
when(mockResultSet.getString("METADATA")).thenReturn("{\"malformed_no_closing_bracket");
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ snowflakeClient.loadTableMetadata(
+ SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1",
"TABLE_1")))
+ .withMessageContaining("{\"malformed_no_closing_bracket");
+ }
+
+ /**
+ * Any unexpected SQLException from the underlying connection will propagate
out as an
+ * UncheckedSQLException when getting table metadata.
+ */
+ @Test
+ public void testGetTableMetadataSQLException() throws SQLException,
InterruptedException {
+ when(mockClientPool.run(any(ClientPool.Action.class)))
+ .thenThrow(new SQLException("Fake SQL exception"));
+ Assertions.assertThatExceptionOfType(UncheckedSQLException.class)
+ .isThrownBy(
+ () ->
+ snowflakeClient.loadTableMetadata(
+ SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1",
"TABLE_1")))
+ .withStackTraceContaining("Fake SQL exception");
+ }
+
+ /**
+ * Any unexpected InterruptedException from the underlying connection will
propagate out as an
+ * UncheckedInterruptedException when getting table metadata.
+ */
+ @Test
+ public void testGetTableMetadataInterruptedException() throws SQLException,
InterruptedException {
+ when(mockClientPool.run(any(ClientPool.Action.class)))
+ .thenThrow(new InterruptedException("Fake interrupted exception"));
+ Assertions.assertThatExceptionOfType(UncheckedInterruptedException.class)
+ .isThrownBy(
+ () ->
+ snowflakeClient.loadTableMetadata(
+ SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1",
"TABLE_1")))
+ .withStackTraceContaining("Fake interrupted exception");
+ }
+
+ /** Calling close() propagates to closing underlying client pool. */
+ @Test
+ public void testClose() {
+ snowflakeClient.close();
+ verify(mockClientPool).close();
+ }
+}
diff --git
a/snowflake/src/test/java/org/apache/iceberg/snowflake/NamespaceHelpersTest.java
b/snowflake/src/test/java/org/apache/iceberg/snowflake/NamespaceHelpersTest.java
new file mode 100644
index 0000000000..2dd7fb6ec9
--- /dev/null
+++
b/snowflake/src/test/java/org/apache/iceberg/snowflake/NamespaceHelpersTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+public class NamespaceHelpersTest {
+ @Test
+ public void testRoundTripRoot() {
+ Namespace icebergNamespace = Namespace.empty();
+ SnowflakeIdentifier snowflakeIdentifier =
+ NamespaceHelpers.toSnowflakeIdentifier(icebergNamespace);
+
Assertions.assertThat(snowflakeIdentifier).isEqualTo(SnowflakeIdentifier.ofRoot());
+
Assertions.assertThat(NamespaceHelpers.toIcebergNamespace(snowflakeIdentifier))
+ .isEqualTo(icebergNamespace);
+ }
+
+ @Test
+ public void testRoundTripDatabase() {
+ Namespace icebergNamespace = Namespace.of("DB1");
+ SnowflakeIdentifier snowflakeIdentifier =
+ NamespaceHelpers.toSnowflakeIdentifier(icebergNamespace);
+
Assertions.assertThat(snowflakeIdentifier).isEqualTo(SnowflakeIdentifier.ofDatabase("DB1"));
+
Assertions.assertThat(NamespaceHelpers.toIcebergNamespace(snowflakeIdentifier))
+ .isEqualTo(icebergNamespace);
+ }
+
+ @Test
+ public void testRoundTripSchema() {
+ Namespace icebergNamespace = Namespace.of("DB1", "SCHEMA1");
+ SnowflakeIdentifier snowflakeIdentifier =
+ NamespaceHelpers.toSnowflakeIdentifier(icebergNamespace);
+ Assertions.assertThat(snowflakeIdentifier)
+ .isEqualTo(SnowflakeIdentifier.ofSchema("DB1", "SCHEMA1"));
+
Assertions.assertThat(NamespaceHelpers.toIcebergNamespace(snowflakeIdentifier))
+ .isEqualTo(icebergNamespace);
+ }
+
+ @Test
+ public void testRoundTripTable() {
+ TableIdentifier icebergTable = TableIdentifier.of("DB1", "SCHEMA1",
"TABLE1");
+ SnowflakeIdentifier snowflakeIdentifier =
NamespaceHelpers.toSnowflakeIdentifier(icebergTable);
+ Assertions.assertThat(snowflakeIdentifier)
+ .isEqualTo(SnowflakeIdentifier.ofTable("DB1", "SCHEMA1", "TABLE1"));
+
Assertions.assertThat(NamespaceHelpers.toIcebergTableIdentifier(snowflakeIdentifier))
+ .isEqualTo(icebergTable);
+ }
+
+ @Test
+ public void testToSnowflakeIdentifierMaxNamespaceLevel() {
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ NamespaceHelpers.toSnowflakeIdentifier(
+ Namespace.of("DB1", "SCHEMA1", "THIRD_NS_LVL")))
+ .withMessageContaining("max namespace level");
+ }
+
+ @Test
+ public void testToSnowflakeIdentifierTableBadNamespace() {
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ NamespaceHelpers.toSnowflakeIdentifier(
+ TableIdentifier.of(Namespace.of("DB1_WITHOUT_SCHEMA"),
"TABLE1")))
+ .withMessageContaining("must be at the SCHEMA level");
+ }
+
+ @Test
+ public void testToIcebergNamespaceTableFails() {
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ NamespaceHelpers.toIcebergNamespace(
+ SnowflakeIdentifier.ofTable("DB1", "SCHEMA1", "TABLE1")))
+ .withMessageContaining("Cannot convert identifier");
+ }
+
+ @Test
+ public void testToIcebergTableIdentifier() {
+ Assertions.assertThat(
+ NamespaceHelpers.toIcebergTableIdentifier(
+ SnowflakeIdentifier.ofTable("DB1", "SCHEMA1", "TABLE1")))
+ .isEqualTo(TableIdentifier.of("DB1", "SCHEMA1", "TABLE1"));
+ }
+
+ @Test
+ public void testToIcebergTableIdentifierWrongType() {
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ NamespaceHelpers.toIcebergTableIdentifier(
+ SnowflakeIdentifier.ofSchema("DB1", "SCHEMA1")))
+ .withMessageContaining("must be type TABLE");
+ }
+}
diff --git
a/snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java
b/snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java
new file mode 100644
index 0000000000..9f66f352e8
--- /dev/null
+++
b/snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InMemoryFileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SnowflakeCatalogTest {
+
+ private static final String TEST_CATALOG_NAME = "slushLog";
+ private SnowflakeCatalog catalog;
+ private FakeSnowflakeClient fakeClient;
+ private InMemoryFileIO fakeFileIO;
+ private SnowflakeCatalog.FileIOFactory fakeFileIOFactory;
+ private Map<String, String> properties;
+
+ @Before
+ public void before() {
+ catalog = new SnowflakeCatalog();
+
+ fakeClient = new FakeSnowflakeClient();
+ fakeClient.addTable(
+ SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TAB_1"),
+ SnowflakeTableMetadata.parseJson(
+
"{\"metadataLocation\":\"s3://tab1/metadata/v3.metadata.json\",\"status\":\"success\"}"));
+ fakeClient.addTable(
+ SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TAB_2"),
+ SnowflakeTableMetadata.parseJson(
+
"{\"metadataLocation\":\"s3://tab2/metadata/v1.metadata.json\",\"status\":\"success\"}"));
+ fakeClient.addTable(
+ SnowflakeIdentifier.ofTable("DB_2", "SCHEMA_2", "TAB_3"),
+ SnowflakeTableMetadata.parseJson(
+
"{\"metadataLocation\":\"azure://myaccount.blob.core.windows.net/mycontainer/tab3/metadata/v334.metadata.json\",\"status\":\"success\"}"));
+ fakeClient.addTable(
+ SnowflakeIdentifier.ofTable("DB_2", "SCHEMA_2", "TAB_4"),
+ SnowflakeTableMetadata.parseJson(
+
"{\"metadataLocation\":\"azure://myaccount.blob.core.windows.net/mycontainer/tab4/metadata/v323.metadata.json\",\"status\":\"success\"}"));
+ fakeClient.addTable(
+ SnowflakeIdentifier.ofTable("DB_3", "SCHEMA_3", "TAB_5"),
+ SnowflakeTableMetadata.parseJson(
+
"{\"metadataLocation\":\"gcs://tab5/metadata/v793.metadata.json\",\"status\":\"success\"}"));
+ fakeClient.addTable(
+ SnowflakeIdentifier.ofTable("DB_3", "SCHEMA_4", "TAB_6"),
+ SnowflakeTableMetadata.parseJson(
+
"{\"metadataLocation\":\"gcs://tab6/metadata/v123.metadata.json\",\"status\":\"success\"}"));
+
+ fakeFileIO = new InMemoryFileIO();
+
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "x", Types.StringType.get(),
"comment1"),
+ Types.NestedField.required(2, "y", Types.StringType.get(),
"comment2"));
+ PartitionSpec partitionSpec =
+
PartitionSpec.builderFor(schema).identity("x").withSpecId(1000).build();
+ fakeFileIO.addFile(
+ "s3://tab1/metadata/v3.metadata.json",
+ TableMetadataParser.toJson(
+ TableMetadata.newTableMetadata(
+ schema, partitionSpec, "s3://tab1/", ImmutableMap.<String,
String>of()))
+ .getBytes());
+ fakeFileIO.addFile(
+
"wasbs://[email protected]/tab3/metadata/v334.metadata.json",
+ TableMetadataParser.toJson(
+ TableMetadata.newTableMetadata(
+ schema,
+ partitionSpec,
+
"wasbs://[email protected]/tab1/",
+ ImmutableMap.<String, String>of()))
+ .getBytes());
+ fakeFileIO.addFile(
+ "gs://tab5/metadata/v793.metadata.json",
+ TableMetadataParser.toJson(
+ TableMetadata.newTableMetadata(
+ schema, partitionSpec, "gs://tab5/", ImmutableMap.<String,
String>of()))
+ .getBytes());
+
+ fakeFileIOFactory =
+ new SnowflakeCatalog.FileIOFactory() {
+ @Override
+ public FileIO newFileIO(String impl, Map<String, String> prop,
Object hadoopConf) {
+ return fakeFileIO;
+ }
+ };
+
+ properties = Maps.newHashMap();
+ catalog.initialize(TEST_CATALOG_NAME, fakeClient, fakeFileIOFactory,
properties);
+ }
+
+ @Test
+ public void testInitializeNullClient() {
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () -> catalog.initialize(TEST_CATALOG_NAME, null,
fakeFileIOFactory, properties))
+ .withMessageContaining("snowflakeClient must be non-null");
+ }
+
+ @Test
+ public void testInitializeNullFileIO() {
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> catalog.initialize(TEST_CATALOG_NAME, fakeClient,
null, properties))
+ .withMessageContaining("fileIOFactory must be non-null");
+ }
+
+ @Test
+ public void testListNamespaceInRoot() {
+ Assertions.assertThat(catalog.listNamespaces())
+ .containsExactly(Namespace.of("DB_1"), Namespace.of("DB_2"),
Namespace.of("DB_3"));
+ }
+
+ @Test
+ public void testListNamespaceWithinDB() {
+ String dbName = "DB_1";
+ Assertions.assertThat(catalog.listNamespaces(Namespace.of(dbName)))
+ .containsExactly(Namespace.of(dbName, "SCHEMA_1"));
+ }
+
+ @Test
+ public void testListNamespaceWithinNonExistentDB() {
+ // Existence check for nonexistent parent namespaces is optional in the
SupportsNamespaces
+ // interface.
+ String dbName = "NONEXISTENT_DB";
+ Assertions.assertThatExceptionOfType(RuntimeException.class)
+ .isThrownBy(() -> catalog.listNamespaces(Namespace.of(dbName)))
+ .withMessageContaining("does not exist")
+ .withMessageContaining(dbName);
+ }
+
+ @Test
+ public void testListNamespaceWithinSchema() {
+ // No "sub-namespaces" beyond database.schema; invalid to try to list
namespaces given
+ // a database.schema.
+ String dbName = "DB_3";
+ String schemaName = "SCHEMA_4";
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> catalog.listNamespaces(Namespace.of(dbName,
schemaName)))
+ .withMessageContaining("level")
+ .withMessageContaining("DB_3.SCHEMA_4");
+ }
+
+ @Test
+ public void testListTables() {
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> catalog.listTables(Namespace.empty()))
+ .withMessageContaining("listTables must be at SCHEMA level");
+ }
+
+ @Test
+ public void testListTablesWithinDB() {
+ String dbName = "DB_1";
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> catalog.listTables(Namespace.of(dbName)))
+ .withMessageContaining("listTables must be at SCHEMA level");
+ }
+
+ @Test
+ public void testListTablesWithinNonexistentDB() {
+ String dbName = "NONEXISTENT_DB";
+ String schemaName = "NONEXISTENT_SCHEMA";
+ Assertions.assertThatExceptionOfType(RuntimeException.class)
+ .isThrownBy(() -> catalog.listTables(Namespace.of(dbName, schemaName)))
+ .withMessageContaining("does not exist")
+ .withMessageContaining(dbName);
+ }
+
+ @Test
+ public void testListTablesWithinSchema() {
+ String dbName = "DB_2";
+ String schemaName = "SCHEMA_2";
+ Assertions.assertThat(catalog.listTables(Namespace.of(dbName, schemaName)))
+ .containsExactly(
+ TableIdentifier.of("DB_2", "SCHEMA_2", "TAB_3"),
+ TableIdentifier.of("DB_2", "SCHEMA_2", "TAB_4"));
+ }
+
+ @Test
+ public void testListTablesWithinNonexistentSchema() {
+ String dbName = "DB_2";
+ String schemaName = "NONEXISTENT_SCHEMA";
+ Assertions.assertThatExceptionOfType(RuntimeException.class)
+ .isThrownBy(() -> catalog.listTables(Namespace.of(dbName, schemaName)))
+ .withMessageContaining("does not exist")
+ .withMessageContaining("DB_2.NONEXISTENT_SCHEMA");
+ }
+
+ @Test
+ public void testLoadS3Table() {
+ Table table = catalog.loadTable(TableIdentifier.of(Namespace.of("DB_1",
"SCHEMA_1"), "TAB_1"));
+ Assertions.assertThat(table.location()).isEqualTo("s3://tab1/");
+ }
+
+ @Test
+ public void testLoadAzureTable() {
+ Table table = catalog.loadTable(TableIdentifier.of(Namespace.of("DB_2",
"SCHEMA_2"), "TAB_3"));
+ Assertions.assertThat(table.location())
+
.isEqualTo("wasbs://[email protected]/tab1/");
+ }
+
+ @Test
+ public void testLoadGcsTable() {
+ Table table = catalog.loadTable(TableIdentifier.of(Namespace.of("DB_3",
"SCHEMA_3"), "TAB_5"));
+ Assertions.assertThat(table.location()).isEqualTo("gs://tab5/");
+ }
+
+ @Test
+ public void testLoadTableWithMalformedTableIdentifier() {
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ catalog.loadTable(
+ TableIdentifier.of(Namespace.of("DB_1", "SCHEMA_1",
"BAD_NS_LEVEL"), "TAB_1")))
+ .withMessageContaining("level")
+ .withMessageContaining("DB_1.SCHEMA_1.BAD_NS_LEVEL");
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
catalog.loadTable(TableIdentifier.of(Namespace.of("DB_WITHOUT_SCHEMA"),
"TAB_1")))
+ .withMessageContaining("level")
+ .withMessageContaining("DB_WITHOUT_SCHEMA.TAB_1");
+ }
+
+ @Test
+ public void testCloseBeforeInitializeDoesntThrow() throws IOException {
+ catalog = new SnowflakeCatalog();
+
+ // Make sure no exception is thrown if we call close() before
initialize(), in case callers
+ // add a catalog to auto-close() helpers but end up never
using/initializing a catalog.
+ catalog.close();
+
+ Assertions.assertThat(fakeClient.isClosed())
+ .overridingErrorMessage("expected not to have called close() on
snowflakeClient")
+ .isFalse();
+ }
+
+ @Test
+ public void testClose() throws IOException {
+ catalog.newTableOps(TableIdentifier.of("DB_1", "SCHEMA_1", "TAB_1"));
+ catalog.close();
+ Assertions.assertThat(fakeClient.isClosed())
+ .overridingErrorMessage("expected close() to propagate to
snowflakeClient")
+ .isTrue();
+ Assertions.assertThat(fakeFileIO.isClosed())
+ .overridingErrorMessage("expected close() to propagate to fileIO")
+ .isTrue();
+ }
+
+ @Test
+ public void testTableNameFromTableOperations() {
+ SnowflakeTableOperations castedTableOps =
+ (SnowflakeTableOperations)
+ catalog.newTableOps(TableIdentifier.of("DB_1", "SCHEMA_1",
"TAB_1"));
+
Assertions.assertThat(castedTableOps.fullTableName()).isEqualTo("slushLog.DB_1.SCHEMA_1.TAB_1");
+ }
+
+ @Test
+ public void testDatabaseExists() {
+
Assertions.assertThat(catalog.namespaceExists(Namespace.of("DB_1"))).isTrue();
+
Assertions.assertThat(catalog.namespaceExists(Namespace.of("NONEXISTENT_DB"))).isFalse();
+ }
+
+ @Test
+ public void testSchemaExists() {
+ Assertions.assertThat(catalog.namespaceExists(Namespace.of("DB_1",
"SCHEMA_1"))).isTrue();
+ Assertions.assertThat(catalog.namespaceExists(Namespace.of("DB_1",
"NONEXISTENT_SCHEMA")))
+ .isFalse();
+
Assertions.assertThat(catalog.namespaceExists(Namespace.of("NONEXISTENT_DB",
"SCHEMA_1")))
+ .isFalse();
+ }
+}
diff --git a/spark/v3.1/build.gradle b/spark/v3.1/build.gradle
index eca34afcbd..c7861d36e5 100644
--- a/spark/v3.1/build.gradle
+++ b/spark/v3.1/build.gradle
@@ -213,6 +213,9 @@ project(':iceberg-spark:iceberg-spark-runtime-3.1_2.12') {
implementation(project(':iceberg-nessie')) {
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}
+ implementation (project(':iceberg-snowflake')) {
+ exclude group: 'net.snowflake' , module: 'snowflake-jdbc'
+ }
integrationImplementation
"org.apache.spark:spark-hive_2.12:${sparkVersion}"
integrationImplementation 'org.junit.vintage:junit-vintage-engine'
diff --git a/spark/v3.2/build.gradle b/spark/v3.2/build.gradle
index 62a51ae494..c7bef9928d 100644
--- a/spark/v3.2/build.gradle
+++ b/spark/v3.2/build.gradle
@@ -224,6 +224,9 @@
project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
implementation(project(':iceberg-nessie')) {
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}
+ implementation (project(':iceberg-snowflake')) {
+ exclude group: 'net.snowflake' , module: 'snowflake-jdbc'
+ }
integrationImplementation
"org.scala-lang.modules:scala-collection-compat_${scalaVersion}"
integrationImplementation
"org.apache.spark:spark-hive_${scalaVersion}:${sparkVersion}"
diff --git a/spark/v3.3/build.gradle b/spark/v3.3/build.gradle
index 56a6fe7f51..3f7cf71b74 100644
--- a/spark/v3.3/build.gradle
+++ b/spark/v3.3/build.gradle
@@ -216,6 +216,9 @@
project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
implementation(project(':iceberg-nessie')) {
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}
+ implementation (project(':iceberg-snowflake')) {
+ exclude group: 'net.snowflake' , module: 'snowflake-jdbc'
+ }
integrationImplementation
"org.scala-lang.modules:scala-collection-compat_${scalaVersion}"
integrationImplementation
"org.apache.spark:spark-hive_${scalaVersion}:${sparkVersion}"
diff --git a/versions.props b/versions.props
index d007ff43ac..670487b997 100644
--- a/versions.props
+++ b/versions.props
@@ -28,6 +28,7 @@ org.scala-lang.modules:scala-collection-compat_2.12 = 2.6.0
org.scala-lang.modules:scala-collection-compat_2.13 = 2.6.0
com.emc.ecs:object-client-bundle = 3.3.2
org.immutables:value = 2.9.2
+net.snowflake:snowflake-jdbc = 3.13.22
# test deps
org.junit.vintage:junit-vintage-engine = 5.8.2