This is an automated email from the ASF dual-hosted git repository.
yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new ab799a8ac Spark: Add Namespaces and View support for SparkCatalog
(#1332)
ab799a8ac is described below
commit ab799a8ac8099f137216e2675a0be19cac0901fd
Author: gh-yzou <[email protected]>
AuthorDate: Wed Apr 9 11:16:29 2025 -0700
Spark: Add Namespaces and View support for SparkCatalog (#1332)
---
.../org/apache/polaris/spark/SparkCatalog.java | 120 +++++++--
.../org/apache/polaris/spark/SparkCatalogTest.java | 278 ++++++++++++++++++---
2 files changed, 350 insertions(+), 48 deletions(-)
diff --git
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java
index 2ec0450a0..e38bbe1ad 100644
---
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java
+++
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java
@@ -21,14 +21,35 @@ package org.apache.polaris.spark;
import com.google.common.collect.ImmutableSet;
import java.util.Map;
import java.util.Set;
-import org.apache.iceberg.exceptions.NoSuchTableException;
-import org.apache.spark.sql.catalyst.analysis.*;
-import org.apache.spark.sql.connector.catalog.*;
+import org.apache.iceberg.spark.SupportsReplaceView;
+import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.apache.spark.sql.connector.catalog.StagedTable;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.View;
+import org.apache.spark.sql.connector.catalog.ViewCatalog;
+import org.apache.spark.sql.connector.catalog.ViewChange;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-public class SparkCatalog implements TableCatalog, SupportsNamespaces,
ViewCatalog {
+public class SparkCatalog
+ implements StagingTableCatalog,
+ TableCatalog,
+ SupportsNamespaces,
+ ViewCatalog,
+ SupportsReplaceView {
+
private static final Set<String> DEFAULT_NS_KEYS =
ImmutableSet.of(TableCatalog.PROP_OWNER);
private String catalogName = null;
private org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null;
@@ -43,6 +64,8 @@ public class SparkCatalog implements TableCatalog,
SupportsNamespaces, ViewCatal
@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
+ this.icebergsSparkCatalog = new org.apache.iceberg.spark.SparkCatalog();
+ this.icebergsSparkCatalog.initialize(name, options);
}
@Override
@@ -73,58 +96,88 @@ public class SparkCatalog implements TableCatalog,
SupportsNamespaces, ViewCatal
throw new UnsupportedOperationException("renameTable");
}
+ @Override
+ public void invalidateTable(Identifier ident) {
+ throw new UnsupportedOperationException("invalidateTable");
+ }
+
+ @Override
+ public boolean purgeTable(Identifier ident) {
+ throw new UnsupportedOperationException("purgeTable");
+ }
+
@Override
public Identifier[] listTables(String[] namespace) {
throw new UnsupportedOperationException("listTables");
}
+ @Override
+ public StagedTable stageCreate(
+ Identifier ident, StructType schema, Transform[] transforms, Map<String,
String> properties)
+ throws TableAlreadyExistsException {
+ return this.icebergsSparkCatalog.stageCreate(ident, schema, transforms,
properties);
+ }
+
+ @Override
+ public StagedTable stageReplace(
+ Identifier ident, StructType schema, Transform[] transforms, Map<String,
String> properties)
+ throws NoSuchTableException {
+ return this.icebergsSparkCatalog.stageReplace(ident, schema, transforms,
properties);
+ }
+
+ @Override
+ public StagedTable stageCreateOrReplace(
+ Identifier ident, StructType schema, Transform[] transforms, Map<String,
String> properties) {
+ return this.icebergsSparkCatalog.stageCreateOrReplace(ident, schema,
transforms, properties);
+ }
+
@Override
public String[] defaultNamespace() {
- throw new UnsupportedOperationException("defaultNamespace");
+ return this.icebergsSparkCatalog.defaultNamespace();
}
@Override
public String[][] listNamespaces() {
- throw new UnsupportedOperationException("listNamespaces");
+ return this.icebergsSparkCatalog.listNamespaces();
}
@Override
public String[][] listNamespaces(String[] namespace) throws
NoSuchNamespaceException {
- throw new UnsupportedOperationException("listNamespaces");
+ return this.icebergsSparkCatalog.listNamespaces(namespace);
}
@Override
public Map<String, String> loadNamespaceMetadata(String[] namespace)
throws NoSuchNamespaceException {
- throw new UnsupportedOperationException("loadNamespaceMetadata");
+ return this.icebergsSparkCatalog.loadNamespaceMetadata(namespace);
}
@Override
public void createNamespace(String[] namespace, Map<String, String> metadata)
throws NamespaceAlreadyExistsException {
- throw new UnsupportedOperationException("createNamespace");
+ this.icebergsSparkCatalog.createNamespace(namespace, metadata);
}
@Override
public void alterNamespace(String[] namespace, NamespaceChange... changes)
throws NoSuchNamespaceException {
- throw new UnsupportedOperationException("alterNamespace");
+ this.icebergsSparkCatalog.alterNamespace(namespace, changes);
}
@Override
public boolean dropNamespace(String[] namespace, boolean cascade)
throws NoSuchNamespaceException {
- throw new UnsupportedOperationException("dropNamespace");
+ return this.icebergsSparkCatalog.dropNamespace(namespace, cascade);
}
@Override
public Identifier[] listViews(String... namespace) {
- throw new UnsupportedOperationException("listViews");
+ return this.icebergsSparkCatalog.listViews(namespace);
}
@Override
public View loadView(Identifier ident) throws NoSuchViewException {
- throw new UnsupportedOperationException("loadView");
+ return this.icebergsSparkCatalog.loadView(ident);
}
@Override
@@ -139,23 +192,56 @@ public class SparkCatalog implements TableCatalog,
SupportsNamespaces, ViewCatal
String[] columnComments,
Map<String, String> properties)
throws ViewAlreadyExistsException, NoSuchNamespaceException {
- throw new UnsupportedOperationException("createView");
+ return this.icebergsSparkCatalog.createView(
+ ident,
+ sql,
+ currentCatalog,
+ currentNamespace,
+ schema,
+ queryColumnNames,
+ columnAliases,
+ columnComments,
+ properties);
}
@Override
public View alterView(Identifier ident, ViewChange... changes)
throws NoSuchViewException, IllegalArgumentException {
- throw new UnsupportedOperationException("alterView");
+ return this.icebergsSparkCatalog.alterView(ident, changes);
}
@Override
public boolean dropView(Identifier ident) {
- throw new UnsupportedOperationException("dropView");
+ return this.icebergsSparkCatalog.dropView(ident);
}
@Override
public void renameView(Identifier fromIdentifier, Identifier toIdentifier)
throws NoSuchViewException, ViewAlreadyExistsException {
- throw new UnsupportedOperationException("renameView");
+ this.icebergsSparkCatalog.renameView(fromIdentifier, toIdentifier);
+ }
+
+ @Override
+ public View replaceView(
+ Identifier ident,
+ String sql,
+ String currentCatalog,
+ String[] currentNamespace,
+ StructType schema,
+ String[] queryColumnNames,
+ String[] columnAliases,
+ String[] columnComments,
+ Map<String, String> properties)
+ throws NoSuchNamespaceException, NoSuchViewException {
+ return this.icebergsSparkCatalog.replaceView(
+ ident,
+ sql,
+ currentCatalog,
+ currentNamespace,
+ schema,
+ queryColumnNames,
+ columnAliases,
+ columnComments,
+ properties);
}
}
diff --git
a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
index 50c1e645a..70e9b00c5 100644
---
a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
+++
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
@@ -19,30 +19,271 @@
package org.apache.polaris.spark;
import static org.apache.iceberg.CatalogProperties.CATALOG_IMPL;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import com.google.common.collect.Maps;
+import java.util.Arrays;
import java.util.Map;
import java.util.UUID;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
+import org.apache.spark.sql.connector.catalog.*;
+import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
public class SparkCatalogTest {
private SparkCatalog catalog;
private String catalogName;
+ private static final String[] defaultNS = new String[] {"ns"};
+ private static final Schema defaultSchema =
+ new Schema(
+ 5,
+ required(3, "id", Types.IntegerType.get(), "unique ID"),
+ required(4, "data", Types.StringType.get()));
+
@BeforeEach
- public void setup() {
+ public void setup() throws Exception {
catalogName = "test_" + UUID.randomUUID();
Map<String, String> catalogConfig = Maps.newHashMap();
catalogConfig.put(CATALOG_IMPL,
"org.apache.iceberg.inmemory.InMemoryCatalog");
catalogConfig.put("cache-enabled", "false");
+
catalog = new SparkCatalog();
- catalog.initialize(catalogName, new
CaseInsensitiveStringMap(catalogConfig));
+ Configuration conf = new Configuration();
+ try (MockedStatic<SparkSession> mockedStaticSparkSession =
+ Mockito.mockStatic(SparkSession.class);
+ MockedStatic<SparkUtil> mockedSparkUtil =
Mockito.mockStatic(SparkUtil.class)) {
+ SparkSession mockedSession = Mockito.mock(SparkSession.class);
+
mockedStaticSparkSession.when(SparkSession::active).thenReturn(mockedSession);
+ mockedSparkUtil
+ .when(() -> SparkUtil.hadoopConfCatalogOverrides(mockedSession,
catalogName))
+ .thenReturn(conf);
+ SparkContext mockedContext = Mockito.mock(SparkContext.class);
+ Mockito.when(mockedSession.sparkContext()).thenReturn(mockedContext);
+ Mockito.when(mockedContext.applicationId()).thenReturn("appId");
+ Mockito.when(mockedContext.sparkUser()).thenReturn("test-user");
+ Mockito.when(mockedContext.version()).thenReturn("3.5");
+
+ catalog.initialize(catalogName, new
CaseInsensitiveStringMap(catalogConfig));
+ }
+ catalog.createNamespace(defaultNS, Maps.newHashMap());
+ }
+
+ @Test
+ void testCreateAndLoadNamespace() throws Exception {
+ String[] namespace = new String[] {"ns1"};
+ Map<String, String> metadata = Maps.newHashMap();
+ metadata.put("key1", "value1");
+
+ // no namespace can be found
+ assertThatThrownBy(() -> catalog.loadNamespaceMetadata(namespace))
+ .isInstanceOf(NoSuchNamespaceException.class);
+
+ // create the namespace
+ catalog.createNamespace(namespace, metadata);
+
+ Map<String, String> nsMetadata = catalog.loadNamespaceMetadata(namespace);
+ assertThat(nsMetadata).contains(Map.entry("key1", "value1"));
+ }
+
+ @Test
+ void testDropAndListNamespaces() throws Exception {
+ String[][] lv1ns = new String[][] {{"l1ns1"}, {"l1ns2"}};
+ String[][] lv2ns1 = new String[][] {{"l1ns1", "l2ns1"}, {"l1ns1",
"l2ns2"}};
+ String[][] lv2ns2 = new String[][] {{"l1ns2", "l2ns3"}};
+
+ // create the namespaces
+ for (String[] namespace : lv1ns) {
+ catalog.createNamespace(namespace, Maps.newHashMap());
+ }
+ for (String[] namespace : lv2ns1) {
+ catalog.createNamespace(namespace, Maps.newHashMap());
+ }
+ for (String[] namespace : lv2ns2) {
+ catalog.createNamespace(namespace, Maps.newHashMap());
+ }
+
+ // list namespaces under root
+ String[][] lv1nsResult = catalog.listNamespaces();
+ assertThat(lv1nsResult.length).isEqualTo(lv1ns.length + 1);
+ assertThat(Arrays.asList(lv1nsResult)).contains(defaultNS);
+ for (String[] namespace : lv1ns) {
+ assertThat(Arrays.asList(lv1nsResult)).contains(namespace);
+ }
+ // list namespace under l1ns1
+ String[][] lv2ns1Result = catalog.listNamespaces(lv1ns[0]);
+ assertThat(lv2ns1Result.length).isEqualTo(lv2ns1.length);
+ for (String[] namespace : lv2ns1) {
+ assertThat(Arrays.asList(lv2ns1Result)).contains(namespace);
+ }
+ // list namespace under l1ns2
+ String[][] lv2ns2Result = catalog.listNamespaces(lv1ns[1]);
+ assertThat(lv2ns2Result.length).isEqualTo(lv2ns2.length);
+ for (String[] namespace : lv2ns2) {
+ assertThat(Arrays.asList(lv2ns2Result)).contains(namespace);
+ }
+ // no namespace under l1ns2.l2ns3
+ assertThat(catalog.listNamespaces(lv2ns2[0]).length).isEqualTo(0);
+
+ // drop l1ns2
+ catalog.dropNamespace(lv2ns2[0], true);
+ assertThat(catalog.listNamespaces(lv1ns[1]).length).isEqualTo(0);
+
+ catalog.dropNamespace(lv1ns[1], true);
+ assertThatThrownBy(() -> catalog.listNamespaces(lv1ns[1]))
+ .isInstanceOf(NoSuchNamespaceException.class);
+ }
+
+ @Test
+ void testAlterNamespace() throws Exception {
+ String[] namespace = new String[] {"ns1"};
+ Map<String, String> metadata = Maps.newHashMap();
+ metadata.put("orig_key1", "orig_value1");
+
+ catalog.createNamespace(namespace, metadata);
+ assertThat(catalog.loadNamespaceMetadata(namespace))
+ .contains(Map.entry("orig_key1", "orig_value1"));
+
+ catalog.alterNamespace(namespace, NamespaceChange.setProperty("new_key",
"new_value"));
+ assertThat(catalog.loadNamespaceMetadata(namespace))
+ .contains(Map.entry("new_key", "new_value"));
+ }
+
+ @Test
+ void testStageOperations() throws Exception {
+ Identifier createId = Identifier.of(defaultNS, "iceberg-table-create");
+ Map<String, String> icebergProperties = Maps.newHashMap();
+ icebergProperties.put("provider", "iceberg");
+ icebergProperties.put(TableCatalog.PROP_LOCATION,
"file:///tmp/path/to/iceberg-table/");
+ StructType iceberg_schema = new StructType().add("boolType", "boolean");
+
+ catalog.stageCreate(createId, iceberg_schema, new Transform[0],
icebergProperties);
+
+ catalog.stageCreateOrReplace(createId, iceberg_schema, new Transform[0],
icebergProperties);
+ }
+
+ @Test
+ void testBasicViewOperations() throws Exception {
+ Identifier viewIdentifier = Identifier.of(defaultNS, "test-view");
+ String viewSql = "select id from test-table where id < 3";
+ StructType schema = new StructType().add("id", "long");
+ catalog.createView(
+ viewIdentifier,
+ viewSql,
+ catalogName,
+ defaultNS,
+ schema,
+ new String[0],
+ new String[0],
+ new String[0],
+ Maps.newHashMap());
+
+ // load the view
+ View view = catalog.loadView(viewIdentifier);
+ assertThat(view.query()).isEqualTo(viewSql);
+ assertThat(view.schema()).isEqualTo(schema);
+
+ // alter the view properties
+ catalog.alterView(viewIdentifier, ViewChange.setProperty("view_key1",
"view_value1"));
+ view = catalog.loadView(viewIdentifier);
+ assertThat(view.properties()).contains(Map.entry("view_key1",
"view_value1"));
+
+ // rename the view
+ Identifier newIdentifier = Identifier.of(defaultNS, "new-view");
+ catalog.renameView(viewIdentifier, newIdentifier);
+ assertThatThrownBy(() -> catalog.loadView(viewIdentifier))
+ .isInstanceOf(NoSuchViewException.class);
+ view = catalog.loadView(newIdentifier);
+ assertThat(view.query()).isEqualTo(viewSql);
+ assertThat(view.schema()).isEqualTo(schema);
+
+ // replace the view
+ String newSql = "select id from test-table where id == 3";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key1", "value1");
+ catalog.replaceView(
+ newIdentifier,
+ newSql,
+ catalogName,
+ defaultNS,
+ schema,
+ new String[0],
+ new String[0],
+ new String[0],
+ properties);
+ view = catalog.loadView(newIdentifier);
+ assertThat(view.query()).isEqualTo(newSql);
+ assertThat(view.properties()).contains(Map.entry("key1", "value1"));
+
+ // drop the view
+ catalog.dropView(newIdentifier);
+ assertThatThrownBy(() -> catalog.loadView(newIdentifier))
+ .isInstanceOf(NoSuchViewException.class);
+ }
+
+ @Test
+ void testListViews() throws Exception {
+ // create a new namespace under the default NS
+ String[] namespace = new String[] {"ns", "nsl2"};
+ catalog.createNamespace(namespace, Maps.newHashMap());
+ // table schema
+ StructType schema = new StructType().add("id", "long").add("name",
"string");
+ // create under defaultNS
+ String view1Name = "test-view1";
+ String view1SQL = "select id from test-table where id >= 3";
+ catalog.createView(
+ Identifier.of(defaultNS, view1Name),
+ view1SQL,
+ catalogName,
+ defaultNS,
+ schema,
+ new String[0],
+ new String[0],
+ new String[0],
+ Maps.newHashMap());
+ // create two views under ns.nsl2
+ String[] nsl2ViewNames = new String[] {"test-view2", "test-view3"};
+ String[] nsl2ViewSQLs =
+ new String[] {
+ "select id from test-table where id == 3", "select id from
test-table where id < 3"
+ };
+ for (int i = 0; i < nsl2ViewNames.length; i++) {
+ catalog.createView(
+ Identifier.of(namespace, nsl2ViewNames[i]),
+ nsl2ViewSQLs[i],
+ catalogName,
+ namespace,
+ schema,
+ new String[0],
+ new String[0],
+ new String[0],
+ Maps.newHashMap());
+ }
+ // list views under defaultNS
+ Identifier[] l1Views = catalog.listViews(defaultNS);
+ assertThat(l1Views.length).isEqualTo(1);
+ assertThat(l1Views[0].name()).isEqualTo(view1Name);
+
+ // list views under ns1.nsl2
+ Identifier[] l2Views = catalog.listViews(namespace);
+ assertThat(l2Views.length).isEqualTo(nsl2ViewSQLs.length);
+ for (String name : nsl2ViewNames) {
+ assertThat(Arrays.asList(l2Views)).contains(Identifier.of(namespace,
name));
+ }
}
@Test
@@ -64,34 +305,9 @@ public class SparkCatalogTest {
.isInstanceOf(UnsupportedOperationException.class);
assertThatThrownBy(() -> catalog.listTables(namespace))
.isInstanceOf(UnsupportedOperationException.class);
-
- // namespace methods
- assertThatThrownBy(() -> catalog.loadNamespaceMetadata(namespace))
- .isInstanceOf(UnsupportedOperationException.class);
- assertThatThrownBy(() -> catalog.listNamespaces())
- .isInstanceOf(UnsupportedOperationException.class);
- assertThatThrownBy(() -> catalog.listNamespaces(namespace))
- .isInstanceOf(UnsupportedOperationException.class);
- assertThatThrownBy(() -> catalog.createNamespace(namespace, null))
- .isInstanceOf(UnsupportedOperationException.class);
- assertThatThrownBy(() -> catalog.alterNamespace(namespace))
- .isInstanceOf(UnsupportedOperationException.class);
- assertThatThrownBy(() -> catalog.dropNamespace(namespace, false))
- .isInstanceOf(UnsupportedOperationException.class);
-
- // view methods
- assertThatThrownBy(() -> catalog.listViews(namespace))
- .isInstanceOf(UnsupportedOperationException.class);
- assertThatThrownBy(() -> catalog.loadView(identifier))
- .isInstanceOf(UnsupportedOperationException.class);
- assertThatThrownBy(
- () -> catalog.createView(identifier, null, null, null, null, null,
null, null, null))
- .isInstanceOf(UnsupportedOperationException.class);
- assertThatThrownBy(() -> catalog.alterView(identifier))
- .isInstanceOf(UnsupportedOperationException.class);
- assertThatThrownBy(() -> catalog.dropView(identifier))
+ assertThatThrownBy(() -> catalog.invalidateTable(identifier))
.isInstanceOf(UnsupportedOperationException.class);
- assertThatThrownBy(() -> catalog.renameView(identifier, new_identifier))
+ assertThatThrownBy(() -> catalog.purgeTable(identifier))
.isInstanceOf(UnsupportedOperationException.class);
}
}