This is an automated email from the ASF dual-hosted git repository.
yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 97a49e0ec Spark: Add CreateTable and LoadTable implementation for
SparkCatalog (#1303)
97a49e0ec is described below
commit 97a49e0ec3910e61d1d55d3a06fc01c3c5724bf7
Author: gh-yzou <[email protected]>
AuthorDate: Fri Apr 11 10:56:37 2025 -0700
Spark: Add CreateTable and LoadTable implementation for SparkCatalog (#1303)
---
plugins/pluginlibs.versions.toml | 3 +
plugins/spark/v3.5/build.gradle.kts | 73 +++++++-
.../org/apache/polaris/spark/PolarisCatalog.java | 36 ++++
.../apache/polaris/spark/PolarisRESTCatalog.java | 184 ++++++++++++++++++++
.../apache/polaris/spark/PolarisSparkCatalog.java | 111 +++++++++++++
.../org/apache/polaris/spark/SparkCatalog.java | 103 ++++++++++--
.../spark/rest/CreateGenericTableRESTRequest.java | 46 +++++
.../spark/rest/LoadGenericTableRESTResponse.java | 42 +++++
.../apache/polaris/spark/utils/DeltaHelper.java | 107 ++++++++++++
.../polaris/spark/utils/PolarisCatalogUtils.java | 112 +++++++++++++
.../org/apache/polaris/spark/NoopDeltaCatalog.java | 32 ++++
.../polaris/spark/PolarisInMemoryCatalog.java | 90 ++++++++++
.../org/apache/polaris/spark/SparkCatalogTest.java | 185 +++++++++++++++++++--
.../polaris/spark/rest/DeserializationTest.java | 88 ++++++++++
14 files changed, 1175 insertions(+), 37 deletions(-)
diff --git a/plugins/pluginlibs.versions.toml b/plugins/pluginlibs.versions.toml
index 0a4a515e5..e48f6ef45 100644
--- a/plugins/pluginlibs.versions.toml
+++ b/plugins/pluginlibs.versions.toml
@@ -20,3 +20,6 @@
[versions]
iceberg = "1.8.1"
spark35 = "3.5.5"
+scala212 = "2.12.19"
+scala213 = "2.13.15"
+
diff --git a/plugins/spark/v3.5/build.gradle.kts
b/plugins/spark/v3.5/build.gradle.kts
index 36ca6d528..df37fa229 100644
--- a/plugins/spark/v3.5/build.gradle.kts
+++ b/plugins/spark/v3.5/build.gradle.kts
@@ -41,18 +41,34 @@ val scalaVersion = getAndUseScalaVersionForProject()
val icebergVersion = pluginlibs.versions.iceberg.get()
val spark35Version = pluginlibs.versions.spark35.get()
+val scalaLibraryVersion =
+ if (scalaVersion == "2.12") {
+ pluginlibs.versions.scala212.get()
+ } else {
+ pluginlibs.versions.scala213.get()
+ }
+
dependencies {
implementation(project(":polaris-api-iceberg-service")) {
- // exclude the iceberg and jackson dependencies, use the
- // dependencies packed in the iceberg-spark dependency
+ // exclude the iceberg dependencies, use the ones pulled
+ // by iceberg-core
exclude("org.apache.iceberg", "*")
- exclude("com.fasterxml.jackson.core", "*")
}
+ implementation(project(":polaris-api-catalog-service"))
+ implementation(project(":polaris-core")) { exclude("org.apache.iceberg",
"*") }
+
+ implementation("org.apache.iceberg:iceberg-core:${icebergVersion}")
implementation(
"org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}:${icebergVersion}"
- )
+ ) {
+ // exclude the iceberg rest dependencies, use the ones pulled
+ // with iceberg-core dependency
+ exclude("org.apache.iceberg", "iceberg-core")
+ }
+ compileOnly("org.scala-lang:scala-library:${scalaLibraryVersion}")
+ compileOnly("org.scala-lang:scala-reflect:${scalaLibraryVersion}")
compileOnly("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") {
// exclude log4j dependencies
exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
@@ -78,24 +94,65 @@ dependencies {
}
}
+// TODO: replace the check using gradlew checkstyle plugin
+tasks.register("checkNoDisallowedImports") {
+ doLast {
+ // List of disallowed imports. Right now, we disallow usage of shaded or
+ // relocated libraries in the iceberg spark runtime jar.
+ val disallowedImports =
+ listOf("import org.apache.iceberg.shaded.",
"org.apache.iceberg.relocated.")
+
+ // Directory to scan for Java files
+ val sourceDirs = listOf(file("src/main/java"), file("src/test/java"))
+
+ val violations = mutableListOf<String>()
+ // Scan Java files in each directory
+ sourceDirs.forEach { sourceDir ->
+ fileTree(sourceDir)
+ .matching {
+ include("**/*.java") // Only include Java files
+ }
+ .forEach { file ->
+ val content = file.readText()
+ disallowedImports.forEach { importStatement ->
+ if (content.contains(importStatement)) {
+ violations.add(
+ "Disallowed import found in ${file.relativeTo(projectDir)}:
$importStatement"
+ )
+ }
+ }
+ }
+ }
+
+ if (violations.isNotEmpty()) {
+ throw GradleException("Disallowed imports found! $violations")
+ }
+ }
+}
+
+tasks.named("check") { dependsOn("checkNoDisallowedImports") }
+
tasks.register<ShadowJar>("createPolarisSparkJar") {
archiveClassifier = null
archiveBaseName =
"polaris-iceberg-${icebergVersion}-spark-runtime-${sparkMajorVersion}_${scalaVersion}"
isZip64 = true
- dependencies { exclude("META-INF/**") }
+ mergeServiceFiles()
// pack both the source code and dependencies
from(sourceSets.main.get().output)
configurations = listOf(project.configurations.runtimeClasspath.get())
- mergeServiceFiles()
-
// Optimization: Minimize the JAR (remove unused classes from dependencies)
// The iceberg-spark-runtime plugin is always packaged along with our
polaris-spark plugin,
// therefore excluded from the optimization.
- minimize {
exclude(dependency("org.apache.iceberg:iceberg-spark-runtime-*.*")) }
+ minimize {
+ exclude(dependency("org.apache.iceberg:iceberg-spark-runtime-*.*"))
+ exclude(dependency("org.apache.iceberg:iceberg-core*.*"))
+ }
+
+ relocate("com.fasterxml", "org.apache.polaris.shaded.com.fasterxml.jackson")
}
tasks.withType(Jar::class).named("sourcesJar") {
dependsOn("createPolarisSparkJar") }
diff --git
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisCatalog.java
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisCatalog.java
new file mode 100644
index 000000000..31a6ac189
--- /dev/null
+++
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisCatalog.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.polaris.service.types.GenericTable;
+
+public interface PolarisCatalog {
+ List<TableIdentifier> listGenericTables(Namespace ns);
+
+ GenericTable loadGenericTable(TableIdentifier identifier);
+
+ boolean dropGenericTable(TableIdentifier identifier);
+
+ GenericTable createGenericTable(
+ TableIdentifier identifier, String format, String doc, Map<String,
String> props);
+}
diff --git
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java
new file mode 100644
index 000000000..0b8743132
--- /dev/null
+++
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.rest.Endpoint;
+import org.apache.iceberg.rest.ErrorHandlers;
+import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.iceberg.util.EnvironmentUtil;
+import org.apache.polaris.core.rest.PolarisEndpoints;
+import org.apache.polaris.core.rest.PolarisResourcePaths;
+import org.apache.polaris.service.types.GenericTable;
+import org.apache.polaris.spark.rest.CreateGenericTableRESTRequest;
+import org.apache.polaris.spark.rest.LoadGenericTableRESTResponse;
+
+/**
+ * [[PolarisRESTCatalog]] talks to Polaris REST APIs, and implements the
PolarisCatalog interfaces,
+ * which are generic table related APIs at this moment. This class doesn't
interact with any Spark
+ * objects.
+ */
+public class PolarisRESTCatalog implements PolarisCatalog, Closeable {
+ private final Function<Map<String, String>, RESTClient> clientBuilder;
+
+ private RESTClient restClient = null;
+ private CloseableGroup closeables = null;
+ private Set<Endpoint> endpoints;
+ private OAuth2Util.AuthSession catalogAuth = null;
+ private PolarisResourcePaths pathGenerator = null;
+
+ // the default endpoints to config if server doesn't specify the 'endpoints'
configuration.
+ private static final Set<Endpoint> DEFAULT_ENDPOINTS =
PolarisEndpoints.GENERIC_TABLE_ENDPOINTS;
+
+ public PolarisRESTCatalog() {
+ this(config ->
HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build());
+ }
+
+ public PolarisRESTCatalog(Function<Map<String, String>, RESTClient>
clientBuilder) {
+ this.clientBuilder = clientBuilder;
+ }
+
+ public void initialize(Map<String, String> unresolved,
OAuth2Util.AuthSession catalogAuth) {
+ Preconditions.checkArgument(unresolved != null, "Invalid configuration:
null");
+
+ // Resolve any configuration that is supplied by environment variables.
+ // For example: if we have an entity ("key", "env:envVar") in the
unresolved,
+ // and envVar is configured to envValue in system env. After resolve, we
got
+ // entity ("key", "envValue").
+ Map<String, String> props = EnvironmentUtil.resolveAll(unresolved);
+
+ // TODO: switch to use authManager once iceberg dependency is updated to
1.9.0
+ this.catalogAuth = catalogAuth;
+
+ ConfigResponse config;
+ try (RESTClient initClient =
clientBuilder.apply(props).withAuthSession(catalogAuth)) {
+ config = fetchConfig(initClient, catalogAuth.headers(), props);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close HTTP client", e);
+ }
+
+ // call getConfig to get the server configurations
+ Map<String, String> mergedProps = config.merge(props);
+ if (config.endpoints().isEmpty()) {
+ this.endpoints = DEFAULT_ENDPOINTS;
+ } else {
+ this.endpoints = ImmutableSet.copyOf(config.endpoints());
+ }
+
+ this.pathGenerator =
PolarisResourcePaths.forCatalogProperties(mergedProps);
+ this.restClient =
clientBuilder.apply(mergedProps).withAuthSession(catalogAuth);
+
+ this.closeables = new CloseableGroup();
+ this.closeables.addCloseable(this.restClient);
+ this.closeables.setSuppressCloseFailure(true);
+ }
+
+ protected static ConfigResponse fetchConfig(
+ RESTClient client, Map<String, String> headers, Map<String, String>
properties) {
+ // send the client's warehouse location to the service to keep in sync
+ // this is needed for cases where the warehouse is configured at client
side,
+ // and used by Polaris server as catalog name.
+ ImmutableMap.Builder<String, String> queryParams = ImmutableMap.builder();
+ if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {
+ queryParams.put(
+ CatalogProperties.WAREHOUSE_LOCATION,
+ properties.get(CatalogProperties.WAREHOUSE_LOCATION));
+ }
+
+ ConfigResponse configResponse =
+ client.get(
+ ResourcePaths.config(),
+ queryParams.build(),
+ ConfigResponse.class,
+ headers,
+ ErrorHandlers.defaultErrorHandler());
+ configResponse.validate();
+ return configResponse;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closeables != null) {
+ closeables.close();
+ }
+ }
+
+ @Override
+ public List<TableIdentifier> listGenericTables(Namespace ns) {
+ throw new UnsupportedOperationException("listTables not supported");
+ }
+
+ @Override
+ public boolean dropGenericTable(TableIdentifier identifier) {
+ throw new UnsupportedOperationException("dropTable not supported");
+ }
+
+ @Override
+ public GenericTable createGenericTable(
+ TableIdentifier identifier, String format, String doc, Map<String,
String> props) {
+ Endpoint.check(endpoints, PolarisEndpoints.V1_CREATE_GENERIC_TABLE);
+ CreateGenericTableRESTRequest request =
+ new CreateGenericTableRESTRequest(identifier.name(), format, doc,
props);
+
+ LoadGenericTableRESTResponse response =
+ restClient
+ .withAuthSession(this.catalogAuth)
+ .post(
+ pathGenerator.genericTables(identifier.namespace()),
+ request,
+ LoadGenericTableRESTResponse.class,
+ Map.of(),
+ ErrorHandlers.tableErrorHandler());
+
+ return response.getTable();
+ }
+
+ @Override
+ public GenericTable loadGenericTable(TableIdentifier identifier) {
+ Endpoint.check(endpoints, PolarisEndpoints.V1_LOAD_GENERIC_TABLE);
+ LoadGenericTableRESTResponse response =
+ restClient
+ .withAuthSession(this.catalogAuth)
+ .get(
+ pathGenerator.genericTable(identifier),
+ null,
+ LoadGenericTableRESTResponse.class,
+ Map.of(),
+ ErrorHandlers.tableErrorHandler());
+
+ return response.getTable();
+ }
+}
diff --git
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java
new file mode 100644
index 000000000..8f8c07fba
--- /dev/null
+++
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import java.util.Map;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.polaris.service.types.GenericTable;
+import org.apache.polaris.spark.utils.PolarisCatalogUtils;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * A spark TableCatalog Implementation interacts with Polaris specific APIs
only. The APIs it
+ * interacts with is generic table APIs, and all table operations performed in
this class are
+ * expected to be for non-iceberg tables.
+ */
+public class PolarisSparkCatalog implements TableCatalog {
+
+ private PolarisCatalog polarisCatalog = null;
+ private String catalogName = null;
+
+ public PolarisSparkCatalog(PolarisCatalog polarisCatalog) {
+ this.polarisCatalog = polarisCatalog;
+ }
+
+ @Override
+ public void initialize(String name, CaseInsensitiveStringMap options) {
+ this.catalogName = name;
+ }
+
+ @Override
+ public String name() {
+ return catalogName;
+ }
+
+ @Override
+ public Table loadTable(Identifier identifier) throws NoSuchTableException {
+ try {
+ GenericTable genericTable =
+
this.polarisCatalog.loadGenericTable(Spark3Util.identifierToTableIdentifier(identifier));
+ return PolarisCatalogUtils.loadSparkTable(genericTable);
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ throw new NoSuchTableException(identifier);
+ }
+ }
+
+ @Override
+ public Table createTable(
+ Identifier identifier,
+ StructType schema,
+ Transform[] transforms,
+ Map<String, String> properties)
+ throws TableAlreadyExistsException, NoSuchNamespaceException {
+ try {
+ String format = properties.get(PolarisCatalogUtils.TABLE_PROVIDER_KEY);
+ GenericTable genericTable =
+ this.polarisCatalog.createGenericTable(
+ Spark3Util.identifierToTableIdentifier(identifier), format,
null, properties);
+ return PolarisCatalogUtils.loadSparkTable(genericTable);
+ } catch (AlreadyExistsException e) {
+ throw new TableAlreadyExistsException(identifier);
+ }
+ }
+
+ @Override
+ public Table alterTable(Identifier identifier, TableChange... changes)
+ throws NoSuchTableException {
+ throw new NoSuchTableException(identifier);
+ }
+
+ @Override
+ public boolean dropTable(Identifier identifier) {
+ return false;
+ }
+
+ @Override
+ public void renameTable(Identifier from, Identifier to)
+ throws NoSuchTableException, TableAlreadyExistsException {
+ throw new UnsupportedOperationException("renameTable operation is not
supported");
+ }
+
+ @Override
+ public Identifier[] listTables(String[] namespace) {
+ throw new UnsupportedOperationException("listTables operation is not
supported");
+ }
+}
diff --git
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java
index e38bbe1ad..cf46d9a15 100644
---
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java
+++
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java
@@ -18,10 +18,17 @@
*/
package org.apache.polaris.spark;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import java.util.Map;
-import java.util.Set;
+import org.apache.arrow.util.VisibleForTesting;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.spark.SupportsReplaceView;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.polaris.spark.utils.DeltaHelper;
+import org.apache.polaris.spark.utils.PolarisCatalogUtils;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -42,42 +49,114 @@ import org.apache.spark.sql.connector.catalog.ViewChange;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * SparkCatalog Implementation that is able to interact with both Iceberg
SparkCatalog and Polaris
+ * SparkCatalog. All namespaces and view related operations continue goes
through the Iceberg
+ * SparkCatalog. For table operations, depends on the table format, the
operation can be achieved
+ * with interaction with both Iceberg and Polaris SparkCatalog.
+ */
public class SparkCatalog
implements StagingTableCatalog,
TableCatalog,
SupportsNamespaces,
ViewCatalog,
SupportsReplaceView {
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkCatalog.class);
- private static final Set<String> DEFAULT_NS_KEYS =
ImmutableSet.of(TableCatalog.PROP_OWNER);
- private String catalogName = null;
- private org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null;
-
- // TODO: Add Polaris Specific REST Catalog
+ @VisibleForTesting protected String catalogName = null;
+ @VisibleForTesting protected org.apache.iceberg.spark.SparkCatalog
icebergsSparkCatalog = null;
+ @VisibleForTesting protected PolarisSparkCatalog polarisSparkCatalog = null;
+ @VisibleForTesting protected DeltaHelper deltaHelper = null;
@Override
public String name() {
return catalogName;
}
+ /**
+ * Check whether invalid catalog configuration is provided, and return an
option map with catalog
+ * type configured correctly. This function mainly validates two parts: 1)
No customized catalog
+ * implementation is provided. 2) No non-rest catalog type is configured.
+ */
+ @VisibleForTesting
+ public CaseInsensitiveStringMap validateAndResolveCatalogOptions(
+ CaseInsensitiveStringMap options) {
+ Preconditions.checkArgument(
+ options.get(CatalogProperties.CATALOG_IMPL) == null,
+ "Customized catalog implementation is not supported and not needed,
please remove the configuration!");
+
+ String catalogType =
+ PropertyUtil.propertyAsString(
+ options, CatalogUtil.ICEBERG_CATALOG_TYPE,
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
+ Preconditions.checkArgument(
+ catalogType.equals(CatalogUtil.ICEBERG_CATALOG_TYPE_REST),
+ "Only rest catalog type is allowed, but got catalog type: "
+ + catalogType
+ + ". Either configure the type to rest or remove the config");
+
+ Map<String, String> resolvedOptions = Maps.newHashMap();
+ resolvedOptions.putAll(options);
+ // when no catalog type is configured, iceberg uses hive by default. Here,
we make sure the
+ // type is set to rest since we only support rest catalog.
+ resolvedOptions.put(CatalogUtil.ICEBERG_CATALOG_TYPE,
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
+
+ return new CaseInsensitiveStringMap(resolvedOptions);
+ }
+
+ /**
+ * Initialize REST Catalog for Iceberg and Polaris, this is the only catalog
type supported by
+ * Polaris at this moment.
+ */
+ private void initRESTCatalog(String name, CaseInsensitiveStringMap options) {
+ CaseInsensitiveStringMap resolvedOptions =
validateAndResolveCatalogOptions(options);
+
+ // initialize the icebergSparkCatalog
+ this.icebergsSparkCatalog = new org.apache.iceberg.spark.SparkCatalog();
+ this.icebergsSparkCatalog.initialize(name, resolvedOptions);
+
+ // initialize the polaris spark catalog
+ OAuth2Util.AuthSession catalogAuth =
+ PolarisCatalogUtils.getAuthSession(this.icebergsSparkCatalog);
+ PolarisRESTCatalog restCatalog = new PolarisRESTCatalog();
+ restCatalog.initialize(options, catalogAuth);
+ this.polarisSparkCatalog = new PolarisSparkCatalog(restCatalog);
+ this.polarisSparkCatalog.initialize(name, resolvedOptions);
+ }
+
@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
- this.icebergsSparkCatalog = new org.apache.iceberg.spark.SparkCatalog();
- this.icebergsSparkCatalog.initialize(name, options);
+ initRESTCatalog(name, options);
+ this.deltaHelper = new DeltaHelper(options);
}
@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
- throw new UnsupportedOperationException("loadTable");
+ try {
+ return this.icebergsSparkCatalog.loadTable(ident);
+ } catch (NoSuchTableException e) {
+ return this.polarisSparkCatalog.loadTable(ident);
+ }
}
@Override
public Table createTable(
Identifier ident, StructType schema, Transform[] transforms, Map<String,
String> properties)
- throws TableAlreadyExistsException {
- throw new UnsupportedOperationException("createTable");
+ throws TableAlreadyExistsException, NoSuchNamespaceException {
+ String provider = properties.get(PolarisCatalogUtils.TABLE_PROVIDER_KEY);
+ if (PolarisCatalogUtils.useIceberg(provider)) {
+ return this.icebergsSparkCatalog.createTable(ident, schema, transforms,
properties);
+ } else if (PolarisCatalogUtils.useDelta(provider)) {
+ // For delta table, we load the delta catalog to help dealing with the
+ // delta log creation.
+ TableCatalog deltaCatalog =
deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
+ return deltaCatalog.createTable(ident, schema, transforms, properties);
+ } else {
+ return this.polarisSparkCatalog.createTable(ident, schema, transforms,
properties);
+ }
}
@Override
diff --git
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java
new file mode 100644
index 000000000..4ec348a80
--- /dev/null
+++
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import org.apache.iceberg.rest.RESTRequest;
+import org.apache.polaris.service.types.CreateGenericTableRequest;
+
+/**
+ * RESTRequest definition for CreateGenericTable which extends the iceberg
RESTRequest. This is
+ * currently required because the Iceberg HTTPClient requires the request and
response to be a class
+ * of RESTRequest and RESTResponse.
+ */
+public class CreateGenericTableRESTRequest extends CreateGenericTableRequest
+ implements RESTRequest {
+
+ @JsonCreator
+ public CreateGenericTableRESTRequest(
+ @JsonProperty(value = "name", required = true) String name,
+ @JsonProperty(value = "format", required = true) String format,
+ @JsonProperty(value = "doc") String doc,
+ @JsonProperty(value = "properties") Map<String, String> properties) {
+ super(name, format, doc, properties);
+ }
+
+ @Override
+ public void validate() {}
+}
diff --git
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/LoadGenericTableRESTResponse.java
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/LoadGenericTableRESTResponse.java
new file mode 100644
index 000000000..68c738dae
--- /dev/null
+++
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/LoadGenericTableRESTResponse.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.iceberg.rest.RESTResponse;
+import org.apache.polaris.service.types.GenericTable;
+import org.apache.polaris.service.types.LoadGenericTableResponse;
+
+/**
+ * RESTResponse definition for LoadGenericTable which extends the iceberg
RESTResponse. This is
+ * currently required because the Iceberg HTTPClient requires the request and
response to be a class
+ * of RESTRequest and RESTResponse.
+ */
+public class LoadGenericTableRESTResponse extends LoadGenericTableResponse
implements RESTResponse {
+
+ @JsonCreator
+ public LoadGenericTableRESTResponse(
+ @JsonProperty(value = "table", required = true) GenericTable table) {
+ super(table);
+ }
+
+ @Override
+ public void validate() {}
+}
diff --git
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java
new file mode 100644
index 000000000..297438424
--- /dev/null
+++
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.utils;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.polaris.spark.PolarisSparkCatalog;
+import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeltaHelper {
+ private static final Logger LOG = LoggerFactory.getLogger(DeltaHelper.class);
+
+ public static final String DELTA_CATALOG_IMPL_KEY = "delta-catalog-impl";
+ private static final String DEFAULT_DELTA_CATALOG_CLASS =
+ "org.apache.spark.sql.delta.catalog.DeltaCatalog";
+
+ private TableCatalog deltaCatalog = null;
+ private String deltaCatalogImpl = DEFAULT_DELTA_CATALOG_CLASS;
+
+ public DeltaHelper(CaseInsensitiveStringMap options) {
+ if (options.get(DELTA_CATALOG_IMPL_KEY) != null) {
+ this.deltaCatalogImpl = options.get(DELTA_CATALOG_IMPL_KEY);
+ }
+ }
+
+ public TableCatalog loadDeltaCatalog(PolarisSparkCatalog
polarisSparkCatalog) {
+ if (this.deltaCatalog != null) {
+ return this.deltaCatalog;
+ }
+
+ DynConstructors.Ctor<TableCatalog> ctor;
+ try {
+ ctor =
DynConstructors.builder(TableCatalog.class).impl(deltaCatalogImpl).buildChecked();
+ } catch (NoSuchMethodException e) {
+ throw new IllegalArgumentException(
+ String.format("Cannot initialize Delta Catalog %s: %s",
deltaCatalogImpl, e.getMessage()),
+ e);
+ }
+
+ try {
+ this.deltaCatalog = ctor.newInstance();
+ } catch (ClassCastException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot initialize Delta Catalog, %s does not implement Table
Catalog.",
+ deltaCatalogImpl),
+ e);
+ }
+
+ // set the polaris spark catalog as the delegate catalog of delta catalog
+ ((DelegatingCatalogExtension)
this.deltaCatalog).setDelegateCatalog(polarisSparkCatalog);
+
+ // We want to behave exactly the same as unity catalog for Delta. However,
DeltaCatalog
+ // implementation today is hard coded for unity catalog. Following issue
is used to track
+ // the extension of the usage
https://github.com/delta-io/delta/issues/4306.
+ // Here, we use reflection to set the isUnityCatalog to true for exactly
same behavior as
+ // unity catalog for now.
+ try {
+ // isUnityCatalog is a lazy val, access the compute method for the lazy
val
+ // make sure the method is triggered before the value is set, otherwise,
the
+ // value will be overwritten later when the method is triggered.
+ String methodGetName = "isUnityCatalog" + "$lzycompute";
+ Method method =
this.deltaCatalog.getClass().getDeclaredMethod(methodGetName);
+ method.setAccessible(true);
+ // invoke the lazy methods before it is set
+ method.invoke(this.deltaCatalog);
+ } catch (NoSuchMethodException e) {
+ LOG.warn("No lazy compute method found for variable isUnityCatalog");
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to invoke the lazy compute methods
for isUnityCatalog", e);
+ }
+
+ try {
+ Field field =
this.deltaCatalog.getClass().getDeclaredField("isUnityCatalog");
+ field.setAccessible(true);
+ field.set(this.deltaCatalog, true);
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException(
+ "Failed find the isUnityCatalog field, delta-spark version >= 3.2.1
is required", e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("Failed to set the isUnityCatalog field", e);
+ }
+
+ return this.deltaCatalog;
+ }
+}
diff --git
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java
new file mode 100644
index 000000000..01a4af45d
--- /dev/null
+++
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.utils;
+
+import com.google.common.collect.Maps;
+import java.lang.reflect.Field;
+import java.util.Map;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.iceberg.rest.RESTSessionCatalog;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.polaris.service.types.GenericTable;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.execution.datasources.DataSource;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class PolarisCatalogUtils {
+ public static final String TABLE_PROVIDER_KEY = "provider";
+ public static final String TABLE_PATH_KEY = "path";
+
+ /** Check whether the table provider is iceberg. */
+ public static boolean useIceberg(String provider) {
+ return provider == null || "iceberg".equalsIgnoreCase(provider);
+ }
+
+ /** Check whether the table provider is delta. */
+ public static boolean useDelta(String provider) {
+ return "delta".equalsIgnoreCase(provider);
+ }
+
+ /**
+ * Load spark table using DataSourceV2.
+ *
+ * @return V2Table if DataSourceV2 is available for the table format. For
delta table, it returns
+ * DeltaTableV2.
+ */
+ public static Table loadSparkTable(GenericTable genericTable) {
+ SparkSession sparkSession = SparkSession.active();
+ TableProvider provider =
+ DataSource.lookupDataSourceV2(genericTable.getFormat(),
sparkSession.sessionState().conf())
+ .get();
+ Map<String, String> properties = genericTable.getProperties();
+ boolean hasLocationClause = properties.get(TableCatalog.PROP_LOCATION) !=
null;
+ boolean hasPathClause = properties.get(TABLE_PATH_KEY) != null;
+ Map<String, String> tableProperties = Maps.newHashMap();
+ tableProperties.putAll(properties);
+ if (!hasPathClause && hasLocationClause) {
+ // DataSourceV2 requires the path property on table loading. However,
spark today
+ // doesn't create the corresponding path property if the path keyword is
not
+ // provided by user when location is provided. Here, we duplicate the
location
+ // property as path to make sure the table can be loaded.
+ tableProperties.put(TABLE_PATH_KEY,
properties.get(TableCatalog.PROP_LOCATION));
+ }
+ return DataSourceV2Utils.getTableFromProvider(
+ provider, new CaseInsensitiveStringMap(tableProperties),
scala.Option.empty());
+ }
+
+ /**
+ * Get the catalogAuth field inside the RESTSessionCatalog used by Iceberg
Spark Catalog use
+ * reflection. TODO: Deprecate this function once the iceberg client is
updated to 1.9.0 to use
+ * AuthManager and the capability of injecting an AuthManger is available.
Related iceberg PR:
+ * https://github.com/apache/iceberg/pull/12655
+ */
+ public static OAuth2Util.AuthSession getAuthSession(SparkCatalog
sparkCatalog) {
+ try {
+ Field icebergCatalogField =
sparkCatalog.getClass().getDeclaredField("icebergCatalog");
+ icebergCatalogField.setAccessible(true);
+ Catalog icebergCatalog = (Catalog) icebergCatalogField.get(sparkCatalog);
+ RESTCatalog icebergRestCatalog;
+ if (icebergCatalog instanceof CachingCatalog) {
+ Field catalogField =
icebergCatalog.getClass().getDeclaredField("catalog");
+ catalogField.setAccessible(true);
+ icebergRestCatalog = (RESTCatalog) catalogField.get(icebergCatalog);
+ } else {
+ icebergRestCatalog = (RESTCatalog) icebergCatalog;
+ }
+
+ Field sessionCatalogField =
icebergRestCatalog.getClass().getDeclaredField("sessionCatalog");
+ sessionCatalogField.setAccessible(true);
+ RESTSessionCatalog sessionCatalog =
+ (RESTSessionCatalog) sessionCatalogField.get(icebergRestCatalog);
+
+ Field authField =
sessionCatalog.getClass().getDeclaredField("catalogAuth");
+ authField.setAccessible(true);
+ return (OAuth2Util.AuthSession) authField.get(sessionCatalog);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to get the catalogAuth from the
Iceberg SparkCatalog", e);
+ }
+ }
+}
diff --git
a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java
new file mode 100644
index 000000000..c11e8de3b
--- /dev/null
+++
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension;
+
+/**
+ * This is a fake delta catalog class that is used for testing. This class is
a noop class that
+ * directly passes all calls to the delegate CatalogPlugin configured as part
of
+ * DelegatingCatalogExtension.
+ */
+public class NoopDeltaCatalog extends DelegatingCatalogExtension {
+ // This is a mock of isUnityCatalog scala val in
+ // org.apache.spark.sql.delta.catalog.DeltaCatalog.
+ private boolean isUnityCatalog = false;
+}
diff --git
a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java
new file mode 100644
index 000000000..5c3d59710
--- /dev/null
+++
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import com.google.common.collect.Maps;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.inmemory.InMemoryCatalog;
+import org.apache.polaris.service.types.GenericTable;
+
+/** InMemory implementation for the Polaris Catalog. This class is mainly used
by testing. */
+public class PolarisInMemoryCatalog extends InMemoryCatalog implements
PolarisCatalog {
+ private final ConcurrentMap<TableIdentifier, GenericTable> genericTables;
+
+ public PolarisInMemoryCatalog() {
+ this.genericTables = Maps.newConcurrentMap();
+ }
+
+ @Override
+ public List<TableIdentifier> listGenericTables(Namespace ns) {
+ return this.genericTables.keySet().stream()
+ .filter(t -> t.namespace().equals(ns))
+ .sorted(Comparator.comparing(TableIdentifier::toString))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public GenericTable loadGenericTable(TableIdentifier identifier) {
+ GenericTable table = this.genericTables.get(identifier);
+ if (table == null) {
+ throw new NoSuchTableException("Generic table does not exist: %s",
identifier);
+ }
+
+ return table;
+ }
+
+ @Override
+ public boolean dropGenericTable(TableIdentifier identifier) {
+ return null != this.genericTables.remove(identifier);
+ }
+
+ @Override
+ public GenericTable createGenericTable(
+ TableIdentifier identifier, String format, String doc, Map<String,
String> props) {
+ if (!namespaceExists(identifier.namespace())) {
+ throw new NoSuchNamespaceException(
+ "Cannot create generic table %s. Namespace does not exist: %s",
+ identifier, identifier.namespace());
+ }
+
+ GenericTable previous =
+ this.genericTables.putIfAbsent(
+ identifier,
+ GenericTable.builder()
+ .setName(identifier.name())
+ .setFormat(format)
+ .setProperties(props)
+ .build());
+
+ if (previous != null) {
+ throw new AlreadyExistsException("Generic table already exists: %s",
identifier);
+ }
+
+ return this.genericTables.get(identifier);
+ }
+}
diff --git
a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
index 70e9b00c5..0d142cbcb 100644
---
a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
+++
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
@@ -19,7 +19,6 @@
package org.apache.polaris.spark;
import static org.apache.iceberg.CatalogProperties.CATALOG_IMPL;
-import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -28,32 +27,85 @@ import java.util.Arrays;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.Schema;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.spark.SparkUtil;
-import org.apache.iceberg.types.Types;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.polaris.spark.utils.DeltaHelper;
+import org.apache.polaris.spark.utils.PolarisCatalogUtils;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
-import org.apache.spark.sql.connector.catalog.*;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.catalog.V1Table;
+import org.apache.spark.sql.connector.catalog.View;
+import org.apache.spark.sql.connector.catalog.ViewChange;
import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.execution.datasources.DataSource;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.internal.SessionState;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
+import scala.Option;
public class SparkCatalogTest {
- private SparkCatalog catalog;
+ private static class InMemoryIcebergSparkCatalog extends
org.apache.iceberg.spark.SparkCatalog {
+ private PolarisInMemoryCatalog inMemoryCatalog = null;
+
+ @Override
+ protected Catalog buildIcebergCatalog(String name,
CaseInsensitiveStringMap options) {
+ PolarisInMemoryCatalog inMemoryCatalog = new PolarisInMemoryCatalog();
+ inMemoryCatalog.initialize(name, options);
+
+ this.inMemoryCatalog = inMemoryCatalog;
+
+ return inMemoryCatalog;
+ }
+
+ public PolarisInMemoryCatalog getInMemoryCatalog() {
+ return this.inMemoryCatalog;
+ }
+ }
+
+ /**
+ * And SparkCatalog implementation that uses InMemory catalog implementation
for both Iceberg and
+ * Polaris
+ */
+ private static class InMemorySparkCatalog extends SparkCatalog {
+ @Override
+ public void initialize(String name, CaseInsensitiveStringMap options) {
+ this.catalogName = name;
+ // initialize the InMemory icebergSparkCatalog
+ this.icebergsSparkCatalog = new InMemoryIcebergSparkCatalog();
+ this.icebergsSparkCatalog.initialize(name, options);
+
+ // initialize the polarisSparkCatalog with PolarisSparkCatalog
+ this.polarisSparkCatalog =
+ new PolarisSparkCatalog(
+ ((InMemoryIcebergSparkCatalog)
this.icebergsSparkCatalog).getInMemoryCatalog());
+ this.polarisSparkCatalog.initialize(name, options);
+
+ this.deltaHelper = new DeltaHelper(options);
+ }
+ }
+
+ private InMemorySparkCatalog catalog;
private String catalogName;
private static final String[] defaultNS = new String[] {"ns"};
- private static final Schema defaultSchema =
- new Schema(
- 5,
- required(3, "id", Types.IntegerType.get(), "unique ID"),
- required(4, "data", Types.StringType.get()));
@BeforeEach
public void setup() throws Exception {
@@ -61,8 +113,9 @@ public class SparkCatalogTest {
Map<String, String> catalogConfig = Maps.newHashMap();
catalogConfig.put(CATALOG_IMPL,
"org.apache.iceberg.inmemory.InMemoryCatalog");
catalogConfig.put("cache-enabled", "false");
-
- catalog = new SparkCatalog();
+ catalogConfig.put(
+ DeltaHelper.DELTA_CATALOG_IMPL_KEY,
"org.apache.polaris.spark.NoopDeltaCatalog");
+ catalog = new InMemorySparkCatalog();
Configuration conf = new Configuration();
try (MockedStatic<SparkSession> mockedStaticSparkSession =
Mockito.mockStatic(SparkSession.class);
@@ -83,6 +136,34 @@ public class SparkCatalogTest {
catalog.createNamespace(defaultNS, Maps.newHashMap());
}
+ @Test
+ void testCatalogValidation() {
+ Map<String, String> catalogConfigWithImpl = Maps.newHashMap();
+ catalogConfigWithImpl.put(CATALOG_IMPL,
"org.apache.iceberg.inmemory.InMemoryCatalog");
+ catalogConfigWithImpl.put("cache-enabled", "false");
+ SparkCatalog testCatalog = new SparkCatalog();
+ assertThatThrownBy(
+ () ->
+ testCatalog.validateAndResolveCatalogOptions(
+ new CaseInsensitiveStringMap(catalogConfigWithImpl)))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Customized catalog implementation is not
supported and not needed");
+
+ Map<String, String> catalogConfigInvalidType = Maps.newHashMap();
+ catalogConfigInvalidType.put(CatalogUtil.ICEBERG_CATALOG_TYPE, "hive");
+ assertThatThrownBy(
+ () ->
+ testCatalog.validateAndResolveCatalogOptions(
+ new CaseInsensitiveStringMap(catalogConfigInvalidType)))
+ .isInstanceOf(IllegalArgumentException.class);
+
+ CaseInsensitiveStringMap resolvedMap =
+ testCatalog.validateAndResolveCatalogOptions(
+ new CaseInsensitiveStringMap(Maps.newHashMap()));
+ assertThat(resolvedMap.get(CatalogUtil.ICEBERG_CATALOG_TYPE))
+ .isEqualTo(CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
+ }
+
@Test
void testCreateAndLoadNamespace() throws Exception {
String[] namespace = new String[] {"ns1"};
@@ -286,17 +367,87 @@ public class SparkCatalogTest {
}
}
+ @Test
+ void testCreateAndLoadIcebergTable() throws Exception {
+ Identifier identifier = Identifier.of(defaultNS, "iceberg-table");
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "iceberg");
+ properties.put(TableCatalog.PROP_LOCATION, "file:///tmp/path/to/table/");
+ StructType schema = new StructType().add("boolType", "boolean");
+
+ Table createdTable = catalog.createTable(identifier, schema, new
Transform[0], properties);
+ assertThat(createdTable).isInstanceOf(SparkTable.class);
+
+ // load the table
+ Table table = catalog.loadTable(identifier);
+ // verify iceberg SparkTable is loaded
+ assertThat(table).isInstanceOf(SparkTable.class);
+
+ // verify create table with the same identifier fails with spark
TableAlreadyExistsException
+ StructType newSchema = new StructType().add("LongType", "Long");
+ Map<String, String> newProperties = Maps.newHashMap();
+ newProperties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "iceberg");
+ newProperties.put(TableCatalog.PROP_LOCATION,
"file:///tmp/path/to/table/");
+ assertThatThrownBy(
+ () -> catalog.createTable(identifier, newSchema, new Transform[0],
newProperties))
+ .isInstanceOf(TableAlreadyExistsException.class);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"delta", "csv"})
+ void testCreateAndLoadGenericTable(String format) throws Exception {
+ Identifier identifier = Identifier.of(defaultNS, "generic-test-table");
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, format);
+ properties.put(TableCatalog.PROP_LOCATION,
"file:///tmp/delta/path/to/table/");
+ StructType schema = new StructType().add("boolType", "boolean");
+
+ SQLConf conf = new SQLConf();
+ try (MockedStatic<SparkSession> mockedStaticSparkSession =
+ Mockito.mockStatic(SparkSession.class);
+ MockedStatic<DataSource> mockedStaticDS =
Mockito.mockStatic(DataSource.class);
+ MockedStatic<DataSourceV2Utils> mockedStaticDSV2 =
+ Mockito.mockStatic(DataSourceV2Utils.class)) {
+ SparkSession mockedSession = Mockito.mock(SparkSession.class);
+
mockedStaticSparkSession.when(SparkSession::active).thenReturn(mockedSession);
+ SessionState mockedState = Mockito.mock(SessionState.class);
+ Mockito.when(mockedSession.sessionState()).thenReturn(mockedState);
+ Mockito.when(mockedState.conf()).thenReturn(conf);
+
+ TableProvider provider = Mockito.mock(TableProvider.class);
+ mockedStaticDS
+ .when(() -> DataSource.lookupDataSourceV2(Mockito.eq(format),
Mockito.any()))
+ .thenReturn(Option.apply(provider));
+ V1Table table = Mockito.mock(V1Table.class);
+ mockedStaticDSV2
+ .when(
+ () ->
+ DataSourceV2Utils.getTableFromProvider(
+ Mockito.eq(provider), Mockito.any(), Mockito.any()))
+ .thenReturn(table);
+ Table createdTable = catalog.createTable(identifier, schema, new
Transform[0], properties);
+ assertThat(createdTable).isInstanceOf(V1Table.class);
+
+ // load the table
+ Table loadedTable = catalog.loadTable(identifier);
+ assertThat(loadedTable).isInstanceOf(V1Table.class);
+ }
+
+ StructType newSchema = new StructType().add("LongType", "Long");
+ Map<String, String> newProperties = Maps.newHashMap();
+ newProperties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "parquet");
+ newProperties.put(TableCatalog.PROP_LOCATION,
"file:///tmp/path/to/table/");
+ assertThatThrownBy(
+ () -> catalog.createTable(identifier, newSchema, new Transform[0],
newProperties))
+ .isInstanceOf(TableAlreadyExistsException.class);
+ }
+
@Test
public void testUnsupportedOperations() {
String[] namespace = new String[] {"ns1"};
Identifier identifier = Identifier.of(namespace, "table1");
Identifier new_identifier = Identifier.of(namespace, "table2");
// table methods
- assertThatThrownBy(() -> catalog.loadTable(identifier))
- .isInstanceOf(UnsupportedOperationException.class);
- assertThatThrownBy(
- () -> catalog.createTable(identifier,
Mockito.mock(StructType.class), null, null))
- .isInstanceOf(UnsupportedOperationException.class);
assertThatThrownBy(() -> catalog.alterTable(identifier))
.isInstanceOf(UnsupportedOperationException.class);
assertThatThrownBy(() -> catalog.dropTable(identifier))
diff --git
a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java
new file mode 100644
index 000000000..542fd05d8
--- /dev/null
+++
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.rest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.polaris.service.types.GenericTable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+public class DeserializationTest {
+ private ObjectMapper mapper;
+
+ @BeforeEach
+ public void setUp() {
+ mapper = new ObjectMapper();
+ }
+
+ @ParameterizedTest
+ @MethodSource("genericTableTestCases")
+ public void testLoadGenericTableRESTResponse(String doc, Map<String, String>
properties)
+ throws JsonProcessingException {
+ GenericTable table =
+ GenericTable.builder()
+ .setFormat("delta")
+ .setName("test-table")
+ .setProperties(properties)
+ .setDoc(doc)
+ .build();
+ LoadGenericTableRESTResponse response = new
LoadGenericTableRESTResponse(table);
+ String json = mapper.writeValueAsString(response);
+ LoadGenericTableRESTResponse deserializedResponse =
+ mapper.readValue(json, LoadGenericTableRESTResponse.class);
+ assertThat(deserializedResponse.getTable().getFormat()).isEqualTo("delta");
+
assertThat(deserializedResponse.getTable().getName()).isEqualTo("test-table");
+ assertThat(deserializedResponse.getTable().getDoc()).isEqualTo(doc);
+
assertThat(deserializedResponse.getTable().getProperties().size()).isEqualTo(properties.size());
+ }
+
+ @ParameterizedTest
+ @MethodSource("genericTableTestCases")
+ public void testCreateGenericTableRESTRequest(String doc, Map<String,
String> properties)
+ throws JsonProcessingException {
+ CreateGenericTableRESTRequest request =
+ new CreateGenericTableRESTRequest("test-table", "delta", doc,
properties);
+ String json = mapper.writeValueAsString(request);
+ CreateGenericTableRESTRequest deserializedRequest =
+ mapper.readValue(json, CreateGenericTableRESTRequest.class);
+ assertThat(deserializedRequest.getName()).isEqualTo("test-table");
+ assertThat(deserializedRequest.getFormat()).isEqualTo("delta");
+ assertThat(deserializedRequest.getDoc()).isEqualTo(doc);
+
assertThat(deserializedRequest.getProperties().size()).isEqualTo(properties.size());
+ }
+
+ private static Stream<Arguments> genericTableTestCases() {
+ var doc = "table for testing";
+ var properties = Maps.newHashMap();
+ properties.put("location", "s3://path/to/table/");
+ return Stream.of(
+ Arguments.of(doc, properties),
+ Arguments.of(null, Maps.newHashMap()),
+ Arguments.of(doc, Maps.newHashMap()),
+ Arguments.of(null, properties));
+ }
+}