This is an automated email from the ASF dual-hosted git repository.
yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 475d10a9c Spark: Add rest table operations (drop, list, purge and
rename etc) for Spark Client (#1368)
475d10a9c is described below
commit 475d10a9c85cda80528ba5f24c10a7ed71df5cdf
Author: gh-yzou <[email protected]>
AuthorDate: Thu Apr 17 17:00:08 2025 -0700
Spark: Add rest table operations (drop, list, purge and rename etc) for
Spark Client (#1368)
---
.../apache/polaris/spark/PolarisRESTCatalog.java | 55 ++++-
.../apache/polaris/spark/PolarisSparkCatalog.java | 20 +-
.../org/apache/polaris/spark/SparkCatalog.java | 40 ++-
.../spark/rest/ListGenericTablesRESTResponse.java | 45 ++++
.../org/apache/polaris/spark/NoopDeltaCatalog.java | 9 +
.../org/apache/polaris/spark/SparkCatalogTest.java | 268 +++++++++++++++++----
.../polaris/spark/rest/DeserializationTest.java | 57 ++++-
7 files changed, 432 insertions(+), 62 deletions(-)
diff --git
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java
index 0b8743132..72d258511 100644
---
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java
+++
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java
@@ -19,8 +19,10 @@
package org.apache.polaris.spark;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -31,6 +33,7 @@ import java.util.function.Function;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.rest.Endpoint;
import org.apache.iceberg.rest.ErrorHandlers;
@@ -39,7 +42,9 @@ import org.apache.iceberg.rest.RESTClient;
import org.apache.iceberg.rest.ResourcePaths;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.util.EnvironmentUtil;
+import org.apache.iceberg.util.PropertyUtil;
import org.apache.polaris.core.rest.PolarisEndpoints;
import org.apache.polaris.core.rest.PolarisResourcePaths;
import org.apache.polaris.service.types.GenericTable;
@@ -52,6 +57,8 @@ import
org.apache.polaris.spark.rest.LoadGenericTableRESTResponse;
* objects.
*/
public class PolarisRESTCatalog implements PolarisCatalog, Closeable {
+ public static final String REST_PAGE_SIZE = "rest-page-size";
+
private final Function<Map<String, String>, RESTClient> clientBuilder;
private RESTClient restClient = null;
@@ -59,6 +66,7 @@ public class PolarisRESTCatalog implements PolarisCatalog,
Closeable {
private Set<Endpoint> endpoints;
private OAuth2Util.AuthSession catalogAuth = null;
private PolarisResourcePaths pathGenerator = null;
+ private Integer pageSize = null;
// the default endpoints to config if server doesn't specify the 'endpoints'
configuration.
private static final Set<Endpoint> DEFAULT_ENDPOINTS =
PolarisEndpoints.GENERIC_TABLE_ENDPOINTS;
@@ -101,6 +109,12 @@ public class PolarisRESTCatalog implements PolarisCatalog,
Closeable {
this.pathGenerator =
PolarisResourcePaths.forCatalogProperties(mergedProps);
this.restClient =
clientBuilder.apply(mergedProps).withAuthSession(catalogAuth);
+ this.pageSize = PropertyUtil.propertyAsNullableInt(mergedProps,
REST_PAGE_SIZE);
+ if (pageSize != null) {
+ Preconditions.checkArgument(
+ pageSize > 0, "Invalid value for %s, must be a positive integer",
REST_PAGE_SIZE);
+ }
+
this.closeables = new CloseableGroup();
this.closeables.addCloseable(this.restClient);
this.closeables.setSuppressCloseFailure(true);
@@ -138,12 +152,49 @@ public class PolarisRESTCatalog implements
PolarisCatalog, Closeable {
@Override
public List<TableIdentifier> listGenericTables(Namespace ns) {
- throw new UnsupportedOperationException("listTables not supported");
+ Endpoint.check(endpoints, PolarisEndpoints.V1_LIST_GENERIC_TABLES);
+
+ Map<String, String> queryParams = Maps.newHashMap();
+ ImmutableList.Builder<TableIdentifier> tables = ImmutableList.builder();
+ String pageToken = "";
+ if (pageSize != null) {
+ queryParams.put("pageSize", String.valueOf(pageSize));
+ }
+
+ do {
+ queryParams.put("pageToken", pageToken);
+ ListTablesResponse response =
+ restClient
+ .withAuthSession(this.catalogAuth)
+ .get(
+ pathGenerator.genericTables(ns),
+ queryParams,
+ ListTablesResponse.class,
+ Map.of(),
+ ErrorHandlers.namespaceErrorHandler());
+ pageToken = response.nextPageToken();
+ tables.addAll(response.identifiers());
+ } while (pageToken != null);
+
+ return tables.build();
}
@Override
public boolean dropGenericTable(TableIdentifier identifier) {
- throw new UnsupportedOperationException("dropTable not supported");
+ Endpoint.check(endpoints, PolarisEndpoints.V1_DELETE_GENERIC_TABLE);
+
+ try {
+ restClient
+ .withAuthSession(this.catalogAuth)
+ .delete(
+ pathGenerator.genericTable(identifier),
+ null,
+ Map.of(),
+ ErrorHandlers.tableErrorHandler());
+ return true;
+ } catch (NoSuchTableException e) {
+ return false;
+ }
}
@Override
diff --git
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java
index 8f8c07fba..e1658312b 100644
---
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java
+++
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java
@@ -19,6 +19,7 @@
package org.apache.polaris.spark;
import java.util.Map;
+import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.polaris.service.types.GenericTable;
@@ -90,12 +91,19 @@ public class PolarisSparkCatalog implements TableCatalog {
@Override
public Table alterTable(Identifier identifier, TableChange... changes)
throws NoSuchTableException {
- throw new NoSuchTableException(identifier);
+ // alterTable currently is not supported for generic tables
+ throw new UnsupportedOperationException("alterTable operation is not
supported");
+ }
+
+ @Override
+ public boolean purgeTable(Identifier ident) {
+ // purgeTable for generic table will only do a drop without purge
+ return dropTable(ident);
}
@Override
public boolean dropTable(Identifier identifier) {
- return false;
+ return
this.polarisCatalog.dropGenericTable(Spark3Util.identifierToTableIdentifier(identifier));
}
@Override
@@ -106,6 +114,12 @@ public class PolarisSparkCatalog implements TableCatalog {
@Override
public Identifier[] listTables(String[] namespace) {
- throw new UnsupportedOperationException("listTables operation is not
supported");
+ try {
+ return
this.polarisCatalog.listGenericTables(Namespace.of(namespace)).stream()
+ .map(ident -> Identifier.of(ident.namespace().levels(),
ident.name()))
+ .toArray(Identifier[]::new);
+ } catch (UnsupportedOperationException ex) {
+ return new Identifier[0];
+ }
}
}
diff --git
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java
index cf46d9a15..e88628a70 100644
---
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java
+++
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java
@@ -20,7 +20,9 @@ package org.apache.polaris.spark;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
+import java.util.Arrays;
import java.util.Map;
+import java.util.stream.Stream;
import org.apache.arrow.util.VisibleForTesting;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
@@ -161,33 +163,59 @@ public class SparkCatalog
@Override
public Table alterTable(Identifier ident, TableChange... changes) throws
NoSuchTableException {
- throw new UnsupportedOperationException("alterTable");
+ try {
+ return this.icebergsSparkCatalog.alterTable(ident, changes);
+ } catch (NoSuchTableException e) {
+ Table table = this.polarisSparkCatalog.loadTable(ident);
+ String provider =
table.properties().get(PolarisCatalogUtils.TABLE_PROVIDER_KEY);
+ if (PolarisCatalogUtils.useDelta(provider)) {
+ // For delta table, most of the alter operations is a delta log
manipulation,
+ // we load the delta catalog to help handling the alter table
operation.
+ // NOTE: This currently doesn't work for changing file location and
file format
+ // using ALTER TABLE ...SET LOCATION, and ALTER TABLE ... SET
FILEFORMAT.
+ TableCatalog deltaCatalog =
deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
+ return deltaCatalog.alterTable(ident, changes);
+ }
+ return this.polarisSparkCatalog.alterTable(ident);
+ }
}
@Override
public boolean dropTable(Identifier ident) {
- throw new UnsupportedOperationException("dropTable");
+ return this.icebergsSparkCatalog.dropTable(ident) ||
this.polarisSparkCatalog.dropTable(ident);
}
@Override
public void renameTable(Identifier from, Identifier to)
throws NoSuchTableException, TableAlreadyExistsException {
- throw new UnsupportedOperationException("renameTable");
+ try {
+ this.icebergsSparkCatalog.renameTable(from, to);
+ } catch (NoSuchTableException e) {
+ this.polarisSparkCatalog.renameTable(from, to);
+ }
}
@Override
public void invalidateTable(Identifier ident) {
- throw new UnsupportedOperationException("invalidateTable");
+ this.icebergsSparkCatalog.invalidateTable(ident);
}
@Override
public boolean purgeTable(Identifier ident) {
- throw new UnsupportedOperationException("purgeTable");
+ if (this.icebergsSparkCatalog.purgeTable(ident)) {
+ return true;
+ } else {
+ return this.polarisSparkCatalog.purgeTable(ident);
+ }
}
@Override
public Identifier[] listTables(String[] namespace) {
- throw new UnsupportedOperationException("listTables");
+ Identifier[] icebergIdents =
this.icebergsSparkCatalog.listTables(namespace);
+ Identifier[] genericTableIdents =
this.polarisSparkCatalog.listTables(namespace);
+
+ return Stream.concat(Arrays.stream(icebergIdents),
Arrays.stream(genericTableIdents))
+ .toArray(Identifier[]::new);
}
@Override
diff --git
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/rest/ListGenericTablesRESTResponse.java
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/rest/ListGenericTablesRESTResponse.java
new file mode 100644
index 000000000..ede2c89a9
--- /dev/null
+++
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/rest/ListGenericTablesRESTResponse.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Set;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.RESTResponse;
+import org.apache.polaris.service.types.ListGenericTablesResponse;
+
+/**
+ * RESTResponse definition for ListGenericTable which extends the iceberg
RESTResponse. This is
+ * currently required because the Iceberg HTTPClient requires the request and
response to be a class
+ * of RESTRequest and RESTResponse.
+ */
+public class ListGenericTablesRESTResponse extends ListGenericTablesResponse
+ implements RESTResponse {
+
+ @JsonCreator
+ public ListGenericTablesRESTResponse(
+ @JsonProperty(value = "next-page-token") String nextPageToken,
+ @JsonProperty(value = "identifiers") Set<TableIdentifier> identifiers) {
+ super(nextPageToken, identifiers);
+ }
+
+ @Override
+ public void validate() {}
+}
diff --git
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java
index c11e8de3b..f698615e6 100644
---
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java
+++
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java
@@ -18,7 +18,11 @@
*/
package org.apache.polaris.spark;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableChange;
/**
* This is a fake delta catalog class that is used for testing. This class is
a noop class that
@@ -29,4 +33,9 @@ public class NoopDeltaCatalog extends
DelegatingCatalogExtension {
// This is a mock of isUnityCatalog scala val in
// org.apache.spark.sql.delta.catalog.DeltaCatalog.
private boolean isUnityCatalog = false;
+
+ @Override
+ public Table alterTable(Identifier ident, TableChange... changes) throws
NoSuchTableException {
+ return super.loadTable(ident);
+ }
}
diff --git
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
index 0d142cbcb..6aa4a3c08 100644
---
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
+++
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
@@ -28,20 +28,25 @@ import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.actions.DeleteReachableFiles;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.actions.DeleteReachableFilesSparkAction;
+import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.polaris.spark.utils.DeltaHelper;
import org.apache.polaris.spark.utils.PolarisCatalogUtils;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.catalog.TableProvider;
import org.apache.spark.sql.connector.catalog.V1Table;
import org.apache.spark.sql.connector.catalog.View;
@@ -106,6 +111,8 @@ public class SparkCatalogTest {
private String catalogName;
private static final String[] defaultNS = new String[] {"ns"};
+ private static StructType defaultSchema =
+ new StructType().add("id", "long").add("name", "string");
@BeforeEach
public void setup() throws Exception {
@@ -321,8 +328,6 @@ public class SparkCatalogTest {
// create a new namespace under the default NS
String[] namespace = new String[] {"ns", "nsl2"};
catalog.createNamespace(namespace, Maps.newHashMap());
- // table schema
- StructType schema = new StructType().add("id", "long").add("name",
"string");
// create under defaultNS
String view1Name = "test-view1";
String view1SQL = "select id from test-table where id >= 3";
@@ -331,7 +336,7 @@ public class SparkCatalogTest {
view1SQL,
catalogName,
defaultNS,
- schema,
+ defaultSchema,
new String[0],
new String[0],
new String[0],
@@ -348,7 +353,7 @@ public class SparkCatalogTest {
nsl2ViewSQLs[i],
catalogName,
namespace,
- schema,
+ defaultSchema,
new String[0],
new String[0],
new String[0],
@@ -368,39 +373,222 @@ public class SparkCatalogTest {
}
@Test
- void testCreateAndLoadIcebergTable() throws Exception {
+ void testIcebergTableOperations() throws Exception {
Identifier identifier = Identifier.of(defaultNS, "iceberg-table");
- Map<String, String> properties = Maps.newHashMap();
- properties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "iceberg");
- properties.put(TableCatalog.PROP_LOCATION, "file:///tmp/path/to/table/");
- StructType schema = new StructType().add("boolType", "boolean");
-
- Table createdTable = catalog.createTable(identifier, schema, new
Transform[0], properties);
- assertThat(createdTable).isInstanceOf(SparkTable.class);
+ createAndValidateGenericTableWithLoad(catalog, identifier, defaultSchema,
"iceberg");
// load the table
Table table = catalog.loadTable(identifier);
// verify iceberg SparkTable is loaded
assertThat(table).isInstanceOf(SparkTable.class);
+ Identifier[] icebergTables = catalog.listTables(defaultNS);
+ assertThat(icebergTables.length).isEqualTo(1);
+ assertThat(icebergTables[0]).isEqualTo(Identifier.of(defaultNS,
"iceberg-table"));
+
// verify create table with the same identifier fails with spark
TableAlreadyExistsException
- StructType newSchema = new StructType().add("LongType", "Long");
Map<String, String> newProperties = Maps.newHashMap();
newProperties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "iceberg");
newProperties.put(TableCatalog.PROP_LOCATION,
"file:///tmp/path/to/table/");
assertThatThrownBy(
- () -> catalog.createTable(identifier, newSchema, new Transform[0],
newProperties))
+ () -> catalog.createTable(identifier, defaultSchema, new
Transform[0], newProperties))
.isInstanceOf(TableAlreadyExistsException.class);
+
+ // drop the iceberg table
+ catalog.dropTable(identifier);
+ assertThatThrownBy(() -> catalog.loadTable(identifier))
+ .isInstanceOf(NoSuchTableException.class);
+ assertThat(catalog.listTables(defaultNS)).isEmpty();
}
@ParameterizedTest
@ValueSource(strings = {"delta", "csv"})
void testCreateAndLoadGenericTable(String format) throws Exception {
Identifier identifier = Identifier.of(defaultNS, "generic-test-table");
+ createAndValidateGenericTableWithLoad(catalog, identifier, defaultSchema,
format);
+
+ Identifier[] icebergTables = catalog.listTables(defaultNS);
+ assertThat(icebergTables.length).isEqualTo(1);
+ assertThat(icebergTables[0]).isEqualTo(Identifier.of(defaultNS,
"generic-test-table"));
+
+ Map<String, String> newProperties = Maps.newHashMap();
+ newProperties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "parquet");
+ newProperties.put(TableCatalog.PROP_LOCATION,
"file:///tmp/path/to/table/");
+ assertThatThrownBy(
+ () -> catalog.createTable(identifier, defaultSchema, new
Transform[0], newProperties))
+ .isInstanceOf(TableAlreadyExistsException.class);
+
+ // drop the iceberg table
+ catalog.dropTable(identifier);
+ assertThatThrownBy(() -> catalog.loadTable(identifier))
+ .isInstanceOf(NoSuchTableException.class);
+ assertThat(catalog.listTables(defaultNS)).isEmpty();
+ }
+
+ @Test
+ void testMixedTables() throws Exception {
+ // create two iceberg tables, and three non-iceberg tables
+ String[] tableNames = new String[] {"iceberg1", "iceberg2", "delta1",
"csv1", "delta2"};
+ String[] tableFormats = new String[] {"iceberg", null, "delta", "csv",
"delta"};
+ for (int i = 0; i < tableNames.length; i++) {
+ Identifier identifier = Identifier.of(defaultNS, tableNames[i]);
+ createAndValidateGenericTableWithLoad(catalog, identifier,
defaultSchema, tableFormats[i]);
+ }
+
+ // list all tables
+ Identifier[] tableIdents = catalog.listTables(defaultNS);
+ assertThat(tableIdents.length).isEqualTo(tableNames.length);
+ for (String name : tableNames) {
+ assertThat(tableIdents).contains(Identifier.of(defaultNS, name));
+ }
+
+ // drop iceberg2 and delta1 table
+ catalog.dropTable(Identifier.of(defaultNS, "iceberg2"));
+ catalog.dropTable(Identifier.of(defaultNS, "delta2"));
+
+ String[] remainingTableNames = new String[] {"iceberg1", "delta1", "csv1"};
+ Identifier[] remainingTableIndents = catalog.listTables(defaultNS);
+
assertThat(remainingTableIndents.length).isEqualTo(remainingTableNames.length);
+ for (String name : remainingTableNames) {
+ assertThat(tableIdents).contains(Identifier.of(defaultNS, name));
+ }
+
+ // drop the remaining tables
+ for (String name : remainingTableNames) {
+ catalog.dropTable(Identifier.of(defaultNS, name));
+ }
+ assertThat(catalog.listTables(defaultNS)).isEmpty();
+ }
+
+ @Test
+ void testAlterAndRenameTable() throws Exception {
+ String icebergTableName = "iceberg-table";
+ String deltaTableName = "delta-table";
+ String csvTableName = "csv-table";
+ Identifier icebergIdent = Identifier.of(defaultNS, icebergTableName);
+ Identifier deltaIdent = Identifier.of(defaultNS, deltaTableName);
+ Identifier csvIdent = Identifier.of(defaultNS, csvTableName);
+ createAndValidateGenericTableWithLoad(catalog, icebergIdent,
defaultSchema, "iceberg");
+ createAndValidateGenericTableWithLoad(catalog, deltaIdent, defaultSchema,
"delta");
+ createAndValidateGenericTableWithLoad(catalog, csvIdent, defaultSchema,
"csv");
+
+ // verify alter iceberg table
+ Table newIcebergTable =
+ catalog.alterTable(icebergIdent,
TableChange.setProperty("iceberg_key", "iceberg_value"));
+ assertThat(newIcebergTable).isInstanceOf(SparkTable.class);
+ assertThat(newIcebergTable.properties()).contains(Map.entry("iceberg_key",
"iceberg_value"));
+
+ // verify rename iceberg table works
+ Identifier newIcebergIdent = Identifier.of(defaultNS, "new-iceberg-table");
+ catalog.renameTable(icebergIdent, newIcebergIdent);
+ assertThatThrownBy(() -> catalog.loadTable(icebergIdent))
+ .isInstanceOf(NoSuchTableException.class);
+ Table icebergTable = catalog.loadTable(newIcebergIdent);
+ assertThat(icebergTable).isInstanceOf(SparkTable.class);
+
+ // verify alter delta table is a no-op, and alter csv table throws an
exception
+ SQLConf conf = new SQLConf();
+ try (MockedStatic<SparkSession> mockedStaticSparkSession =
+ Mockito.mockStatic(SparkSession.class);
+ MockedStatic<DataSource> mockedStaticDS =
Mockito.mockStatic(DataSource.class);
+ MockedStatic<DataSourceV2Utils> mockedStaticDSV2 =
+ Mockito.mockStatic(DataSourceV2Utils.class)) {
+ SparkSession mockedSession = Mockito.mock(SparkSession.class);
+
mockedStaticSparkSession.when(SparkSession::active).thenReturn(mockedSession);
+ SessionState mockedState = Mockito.mock(SessionState.class);
+ Mockito.when(mockedSession.sessionState()).thenReturn(mockedState);
+ Mockito.when(mockedState.conf()).thenReturn(conf);
+
+ TableProvider deltaProvider = Mockito.mock(TableProvider.class);
+ mockedStaticDS
+ .when(() -> DataSource.lookupDataSourceV2(Mockito.eq("delta"),
Mockito.any()))
+ .thenReturn(Option.apply(deltaProvider));
+ V1Table deltaTable = Mockito.mock(V1Table.class);
+ Map<String, String> deltaProps = Maps.newHashMap();
+ deltaProps.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "delta");
+ deltaProps.put(TableCatalog.PROP_LOCATION,
"file:///tmp/delta/path/to/table/test-delta/");
+ Mockito.when(deltaTable.properties()).thenReturn(deltaProps);
+ mockedStaticDSV2
+ .when(
+ () ->
+ DataSourceV2Utils.getTableFromProvider(
+ Mockito.eq(deltaProvider), Mockito.any(), Mockito.any()))
+ .thenReturn(deltaTable);
+
+ Table delta =
+ catalog.alterTable(deltaIdent, TableChange.setProperty("delta_key",
"delta_value"));
+ assertThat(delta).isInstanceOf(V1Table.class);
+
+ TableProvider csvProvider = Mockito.mock(TableProvider.class);
+ mockedStaticDS
+ .when(() -> DataSource.lookupDataSourceV2(Mockito.eq("csv"),
Mockito.any()))
+ .thenReturn(Option.apply(csvProvider));
+ Map<String, String> csvProps = Maps.newHashMap();
+ csvProps.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "csv");
+ V1Table csvTable = Mockito.mock(V1Table.class);
+ Mockito.when(csvTable.properties()).thenReturn(csvProps);
+ mockedStaticDSV2
+ .when(
+ () ->
+ DataSourceV2Utils.getTableFromProvider(
+ Mockito.eq(csvProvider), Mockito.any(), Mockito.any()))
+ .thenReturn(csvTable);
+ assertThatThrownBy(
+ () -> catalog.alterTable(csvIdent,
TableChange.setProperty("csv_key", "scv_value")))
+ .isInstanceOf(UnsupportedOperationException.class);
+ }
+
+ // verify rename non-iceberg table is not supported
+ assertThatThrownBy(
+ () -> catalog.renameTable(deltaIdent, Identifier.of(defaultNS,
"new-delta-table")))
+ .isInstanceOf(UnsupportedOperationException.class);
+ assertThatThrownBy(
+ () -> catalog.renameTable(csvIdent, Identifier.of(defaultNS,
"new-csv-table")))
+ .isInstanceOf(UnsupportedOperationException.class);
+ }
+
+ @Test
+ void testPurgeInvalidateTable() throws Exception {
+ Identifier icebergIdent = Identifier.of(defaultNS, "iceberg-table");
+ Identifier deltaIdent = Identifier.of(defaultNS, "delta-table");
+ createAndValidateGenericTableWithLoad(catalog, icebergIdent,
defaultSchema, "iceberg");
+ createAndValidateGenericTableWithLoad(catalog, deltaIdent, defaultSchema,
"delta");
+
+ // test invalidate table is a no op today
+ catalog.invalidateTable(icebergIdent);
+ catalog.invalidateTable(deltaIdent);
+
+ Identifier[] tableIdents = catalog.listTables(defaultNS);
+ assertThat(tableIdents.length).isEqualTo(2);
+
+ // verify purge tables drops the table
+ catalog.purgeTable(deltaIdent);
+ assertThat(catalog.listTables(defaultNS).length).isEqualTo(1);
+
+ // purge iceberg table triggers file deletion
+ try (MockedStatic<SparkActions> mockedStaticActions =
Mockito.mockStatic(SparkActions.class)) {
+ SparkActions actions = Mockito.mock(SparkActions.class);
+ DeleteReachableFilesSparkAction deleteAction =
+ Mockito.mock(DeleteReachableFilesSparkAction.class);
+ mockedStaticActions.when(SparkActions::get).thenReturn(actions);
+
Mockito.when(actions.deleteReachableFiles(Mockito.any())).thenReturn(deleteAction);
+ Mockito.when(deleteAction.io(Mockito.any())).thenReturn(deleteAction);
+ Mockito.when(deleteAction.execute())
+ .thenReturn(Mockito.mock(DeleteReachableFiles.Result.class));
+
+ catalog.purgeTable(icebergIdent);
+ }
+ assertThat(catalog.listTables(defaultNS).length).isEqualTo(0);
+ }
+
+ private void createAndValidateGenericTableWithLoad(
+ InMemorySparkCatalog sparkCatalog, Identifier identifier, StructType
schema, String format)
+ throws Exception {
Map<String, String> properties = Maps.newHashMap();
properties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, format);
- properties.put(TableCatalog.PROP_LOCATION,
"file:///tmp/delta/path/to/table/");
- StructType schema = new StructType().add("boolType", "boolean");
+ properties.put(
+ TableCatalog.PROP_LOCATION,
+ String.format("file:///tmp/delta/path/to/table/%s/",
identifier.name()));
SQLConf conf = new SQLConf();
try (MockedStatic<SparkSession> mockedStaticSparkSession =
@@ -425,40 +613,20 @@ public class SparkCatalogTest {
DataSourceV2Utils.getTableFromProvider(
Mockito.eq(provider), Mockito.any(), Mockito.any()))
.thenReturn(table);
- Table createdTable = catalog.createTable(identifier, schema, new
Transform[0], properties);
- assertThat(createdTable).isInstanceOf(V1Table.class);
-
- // load the table
- Table loadedTable = catalog.loadTable(identifier);
- assertThat(loadedTable).isInstanceOf(V1Table.class);
+ Table createdTable =
+ sparkCatalog.createTable(identifier, schema, new Transform[0],
properties);
+ Table loadedTable = sparkCatalog.loadTable(identifier);
+
+ // verify the create and load table result
+ if (PolarisCatalogUtils.useIceberg(format)) {
+ // iceberg SparkTable is returned for iceberg tables
+ assertThat(createdTable).isInstanceOf(SparkTable.class);
+ assertThat(loadedTable).isInstanceOf(SparkTable.class);
+ } else {
+ // Spark V1 table is returned for non-iceberg tables
+ assertThat(createdTable).isInstanceOf(V1Table.class);
+ assertThat(loadedTable).isInstanceOf(V1Table.class);
+ }
}
-
- StructType newSchema = new StructType().add("LongType", "Long");
- Map<String, String> newProperties = Maps.newHashMap();
- newProperties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "parquet");
- newProperties.put(TableCatalog.PROP_LOCATION,
"file:///tmp/path/to/table/");
- assertThatThrownBy(
- () -> catalog.createTable(identifier, newSchema, new Transform[0],
newProperties))
- .isInstanceOf(TableAlreadyExistsException.class);
- }
-
- @Test
- public void testUnsupportedOperations() {
- String[] namespace = new String[] {"ns1"};
- Identifier identifier = Identifier.of(namespace, "table1");
- Identifier new_identifier = Identifier.of(namespace, "table2");
- // table methods
- assertThatThrownBy(() -> catalog.alterTable(identifier))
- .isInstanceOf(UnsupportedOperationException.class);
- assertThatThrownBy(() -> catalog.dropTable(identifier))
- .isInstanceOf(UnsupportedOperationException.class);
- assertThatThrownBy(() -> catalog.renameTable(identifier, new_identifier))
- .isInstanceOf(UnsupportedOperationException.class);
- assertThatThrownBy(() -> catalog.listTables(namespace))
- .isInstanceOf(UnsupportedOperationException.class);
- assertThatThrownBy(() -> catalog.invalidateTable(identifier))
- .isInstanceOf(UnsupportedOperationException.class);
- assertThatThrownBy(() -> catalog.purgeTable(identifier))
- .isInstanceOf(UnsupportedOperationException.class);
}
}
diff --git
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java
index 542fd05d8..e6747e653 100644
---
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java
+++
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java
@@ -20,23 +20,48 @@ package org.apache.polaris.spark.rest;
import static org.assertj.core.api.Assertions.assertThat;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonFactoryBuilder;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategies;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Stream;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.RESTSerializers;
import org.apache.polaris.service.types.GenericTable;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
public class DeserializationTest {
private ObjectMapper mapper;
+ private static final JsonFactory FACTORY =
+ new JsonFactoryBuilder()
+ .configure(JsonFactory.Feature.INTERN_FIELD_NAMES, false)
+ .configure(JsonFactory.Feature.FAIL_ON_SYMBOL_HASH_OVERFLOW, false)
+ .build();
@BeforeEach
public void setUp() {
- mapper = new ObjectMapper();
+ // NOTE: This is the same setting as iceberg RESTObjectMapper.java.
However,
+ // RESTObjectMapper is not a public class, therefore, we duplicate the
+ // setting here for serialization and deserialization tests.
+ mapper = new ObjectMapper(FACTORY);
+ mapper.setVisibility(PropertyAccessor.FIELD,
JsonAutoDetect.Visibility.ANY);
+ mapper.setVisibility(PropertyAccessor.CREATOR,
JsonAutoDetect.Visibility.ANY);
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ mapper.setPropertyNamingStrategy(new
PropertyNamingStrategies.KebabCaseStrategy());
+ RESTSerializers.registerAll(mapper);
}
@ParameterizedTest
@@ -75,6 +100,36 @@ public class DeserializationTest {
assertThat(deserializedRequest.getProperties().size()).isEqualTo(properties.size());
}
+ @Test
+ public void testListGenericTablesRESTResponse() throws
JsonProcessingException {
+ Namespace namespace = Namespace.of("test-ns");
+ Set<TableIdentifier> idents =
+ ImmutableSet.of(
+ TableIdentifier.of(namespace, "table1"),
+ TableIdentifier.of(namespace, "table2"),
+ TableIdentifier.of(namespace, "table3"));
+
+ // page token is null
+ ListGenericTablesRESTResponse response = new
ListGenericTablesRESTResponse(null, idents);
+ String json = mapper.writeValueAsString(response);
+ ListGenericTablesRESTResponse deserializedResponse =
+ mapper.readValue(json, ListGenericTablesRESTResponse.class);
+ assertThat(deserializedResponse.getNextPageToken()).isNull();
+
assertThat(deserializedResponse.getIdentifiers().size()).isEqualTo(idents.size());
+ for (TableIdentifier identifier : idents) {
+ assertThat(deserializedResponse.getIdentifiers()).contains(identifier);
+ }
+
+ // page token is not null
+ response = new ListGenericTablesRESTResponse("page-token", idents);
+ json = mapper.writeValueAsString(response);
+ deserializedResponse = mapper.readValue(json,
ListGenericTablesRESTResponse.class);
+
assertThat(deserializedResponse.getNextPageToken()).isEqualTo("page-token");
+ for (TableIdentifier identifier : idents) {
+ assertThat(deserializedResponse.getIdentifiers()).contains(identifier);
+ }
+ }
+
private static Stream<Arguments> genericTableTestCases() {
var doc = "table for testing";
var properties = Maps.newHashMap();