eric-maynard commented on code in PR #1862:
URL: https://github.com/apache/polaris/pull/1862#discussion_r2146083284
##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java:
##########
@@ -156,15 +159,23 @@ public Table createTable(
throw new UnsupportedOperationException(
"Create table without location key is not supported by Polaris.
Please provide location or path on table creation.");
}
-
if (PolarisCatalogUtils.useDelta(provider)) {
// For delta table, we load the delta catalog to help dealing with the
// delta log creation.
TableCatalog deltaCatalog =
deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
return deltaCatalog.createTable(ident, schema, transforms, properties);
- } else {
- return this.polarisSparkCatalog.createTable(ident, schema, transforms,
properties);
}
+ if (PolarisCatalogUtils.useHudi(provider)) {
+ // First make a call via polaris's spark catalog
+ // to ensure an entity is created within the catalog and is authorized
+ polarisSparkCatalog.createTable(ident, schema, transforms, properties);
+
+ // Then for actually creating the hudi table, we load HoodieCatalog
+ // to create the .hoodie folder in cloud storage
+ TableCatalog hudiCatalog =
hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);
+ return hudiCatalog.createTable(ident, schema, transforms, properties);
+ }
+ return this.polarisSparkCatalog.createTable(ident, schema, transforms,
properties);
Review Comment:
Let's keep the `if { } else if {} else { }` structure
##########
plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkHudiIT.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.quarkus.it;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+import java.io.File;
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.polaris.service.it.env.IntegrationTestsHelper;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+@QuarkusIntegrationTest
+public class SparkHudiIT extends SparkIntegrationBase {
+
+ @Override
+ protected SparkSession.Builder withCatalog(SparkSession.Builder builder,
String catalogName) {
+ return builder
+ .config("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
+ .config(
+ "spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
+ .config(
+ String.format("spark.sql.catalog.%s", catalogName),
+ "org.apache.polaris.spark.SparkCatalog")
+ .config("spark.sql.warehouse.dir", warehouseDir.toString())
+ .config(String.format("spark.sql.catalog.%s.type", catalogName),
"rest")
+ .config(
+ String.format("spark.sql.catalog.%s.uri", catalogName),
+ endpoints.catalogApiEndpoint().toString())
+ .config(String.format("spark.sql.catalog.%s.warehouse", catalogName),
catalogName)
+ .config(String.format("spark.sql.catalog.%s.scope", catalogName),
"PRINCIPAL_ROLE:ALL")
+ .config(
+ String.format("spark.sql.catalog.%s.header.realm", catalogName),
endpoints.realmId())
+ .config(String.format("spark.sql.catalog.%s.token", catalogName),
sparkToken)
+ .config(String.format("spark.sql.catalog.%s.s3.access-key-id",
catalogName), "fakekey")
+ .config(
+ String.format("spark.sql.catalog.%s.s3.secret-access-key",
catalogName), "fakesecret")
+ .config(String.format("spark.sql.catalog.%s.s3.region", catalogName),
"us-west-2")
+ .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
+ .config("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar")
+ // for intial integration test have disabled for now, to revisit
enabling in future
+ .config("hoodie.metadata.enable", "false");
+ }
+
+ private String defaultNs;
+ private String tableRootDir;
+
+ private String getTableLocation(String tableName) {
+ return String.format("%s/%s", tableRootDir, tableName);
+ }
+
+ private String getTableNameWithRandomSuffix() {
+ return generateName("huditb");
+ }
+
+ @BeforeEach
+ public void createDefaultResources(@TempDir Path tempDir) {
+ spark.sparkContext().setLogLevel("INFO");
+ defaultNs = generateName("hudi");
+ // create a default namespace
+ sql("CREATE NAMESPACE %s", defaultNs);
+ sql("USE NAMESPACE %s", defaultNs);
+ tableRootDir =
+
IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve(defaultNs).getPath();
+ }
+
+ @AfterEach
+ public void cleanupHudiData() {
+ // clean up hudi data
+ File dirToDelete = new File(tableRootDir);
+ FileUtils.deleteQuietly(dirToDelete);
+ sql("DROP NAMESPACE %s", defaultNs);
+ }
+
+ @Test
+ public void testBasicTableOperations() {
+ // create a regular hudi table
+ String huditb1 = "huditb1";
+ sql(
+ "CREATE TABLE %s (id INT, name STRING) USING HUDI LOCATION '%s'",
+ huditb1, getTableLocation(huditb1));
+ sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", huditb1);
+ List<Object[]> results = sql("SELECT id,name FROM %s WHERE id > 1 ORDER BY
id DESC", huditb1);
Review Comment:
nit: Some of these queries could be cleaned up
##########
plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkHudiIT.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.quarkus.it;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+import java.io.File;
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.polaris.service.it.env.IntegrationTestsHelper;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+@QuarkusIntegrationTest
+public class SparkHudiIT extends SparkIntegrationBase {
+
+ @Override
+ protected SparkSession.Builder withCatalog(SparkSession.Builder builder,
String catalogName) {
+ return builder
+ .config("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
+ .config(
+ "spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
+ .config(
+ String.format("spark.sql.catalog.%s", catalogName),
+ "org.apache.polaris.spark.SparkCatalog")
+ .config("spark.sql.warehouse.dir", warehouseDir.toString())
+ .config(String.format("spark.sql.catalog.%s.type", catalogName),
"rest")
+ .config(
+ String.format("spark.sql.catalog.%s.uri", catalogName),
+ endpoints.catalogApiEndpoint().toString())
+ .config(String.format("spark.sql.catalog.%s.warehouse", catalogName),
catalogName)
+ .config(String.format("spark.sql.catalog.%s.scope", catalogName),
"PRINCIPAL_ROLE:ALL")
+ .config(
+ String.format("spark.sql.catalog.%s.header.realm", catalogName),
endpoints.realmId())
+ .config(String.format("spark.sql.catalog.%s.token", catalogName),
sparkToken)
+ .config(String.format("spark.sql.catalog.%s.s3.access-key-id",
catalogName), "fakekey")
+ .config(
+ String.format("spark.sql.catalog.%s.s3.secret-access-key",
catalogName), "fakesecret")
+ .config(String.format("spark.sql.catalog.%s.s3.region", catalogName),
"us-west-2")
+ .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
+ .config("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar")
+ // for intial integration test have disabled for now, to revisit
enabling in future
+ .config("hoodie.metadata.enable", "false");
+ }
+
+ private String defaultNs;
+ private String tableRootDir;
+
+ private String getTableLocation(String tableName) {
+ return String.format("%s/%s", tableRootDir, tableName);
+ }
+
+ private String getTableNameWithRandomSuffix() {
+ return generateName("huditb");
+ }
+
+ @BeforeEach
+ public void createDefaultResources(@TempDir Path tempDir) {
+ spark.sparkContext().setLogLevel("INFO");
+ defaultNs = generateName("hudi");
+ // create a default namespace
+ sql("CREATE NAMESPACE %s", defaultNs);
+ sql("USE NAMESPACE %s", defaultNs);
+ tableRootDir =
+
IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve(defaultNs).getPath();
+ }
+
+ @AfterEach
+ public void cleanupHudiData() {
+ // clean up hudi data
+ File dirToDelete = new File(tableRootDir);
+ FileUtils.deleteQuietly(dirToDelete);
+ sql("DROP NAMESPACE %s", defaultNs);
+ }
+
+ @Test
+ public void testBasicTableOperations() {
+ // create a regular hudi table
+ String huditb1 = "huditb1";
+ sql(
+ "CREATE TABLE %s (id INT, name STRING) USING HUDI LOCATION '%s'",
+ huditb1, getTableLocation(huditb1));
+ sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", huditb1);
+ List<Object[]> results = sql("SELECT id,name FROM %s WHERE id > 1 ORDER BY
id DESC", huditb1);
+ assertThat(results.size()).isEqualTo(1);
+ assertThat(results.get(0)).isEqualTo(new Object[] {2, "bob"});
+
+ // create a hudi table with partition
+ String huditb2 = "huditb2";
+ sql(
+ "CREATE TABLE %s (name String, age INT, country STRING) USING HUDI
PARTITIONED BY (country) LOCATION '%s'",
+ huditb2, getTableLocation(huditb2));
+ sql(
+ "INSERT INTO %s VALUES ('anna', 10, 'US'), ('james', 32, 'US'),
('yan', 16, 'CHINA')",
+ huditb2);
+ results = sql("SELECT name, country FROM %s ORDER BY age", huditb2);
+ assertThat(results.size()).isEqualTo(3);
+ assertThat(results.get(0)).isEqualTo(new Object[] {"anna", "US"});
+ assertThat(results.get(1)).isEqualTo(new Object[] {"yan", "CHINA"});
+ assertThat(results.get(2)).isEqualTo(new Object[] {"james", "US"});
+
+ // verify the partition dir is created
+ List<String> subDirs = listDirs(getTableLocation(huditb2));
+ assertThat(subDirs).contains(".hoodie", "country=CHINA", "country=US");
+
+ // test listTables
+ List<Object[]> tables = sql("SHOW TABLES");
+ assertThat(tables.size()).isEqualTo(2);
+ assertThat(tables)
+ .contains(
+ new Object[] {defaultNs, huditb1, false}, new Object[] {defaultNs,
huditb2, false});
+
+ sql("DROP TABLE %s", huditb1);
+ sql("DROP TABLE %s", huditb2);
+ tables = sql("SHOW TABLES");
+ assertThat(tables.size()).isEqualTo(0);
+ }
+
+ @Test
+ public void testUnsupportedAlterTableOperations() {
+ String huditb = getTableNameWithRandomSuffix();
+ sql(
+ "CREATE TABLE %s (name String, age INT, country STRING) USING HUDI
PARTITIONED BY (country) LOCATION '%s'",
+ huditb, getTableLocation(huditb));
+
+ // ALTER TABLE ... RENAME TO ... fails
+ assertThatThrownBy(() -> sql("ALTER TABLE %s RENAME TO new_hudi", huditb))
+ .isInstanceOf(UnsupportedOperationException.class);
+
+ // ALTER TABLE ... SET LOCATION ... fails
+ assertThatThrownBy(() -> sql("ALTER TABLE %s SET LOCATION
'/tmp/new/path'", huditb))
+ .isInstanceOf(UnsupportedOperationException.class);
+
+ sql("DROP TABLE %s", huditb);
+ }
+
+ @Test
+ public void testUnsupportedTableCreateOperations() {
+ String huditb = getTableNameWithRandomSuffix();
+ // create hudi table with no location
+ assertThatThrownBy(() -> sql("CREATE TABLE %s (id INT, name STRING) USING
HUDI", huditb))
+ .isInstanceOf(UnsupportedOperationException.class);
+
+ // CTAS fails
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CREATE TABLE %s USING HUDI LOCATION '%s' AS SELECT 1 AS
id",
+ huditb, getTableLocation(huditb)))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+}
Review Comment:
Is there a test here that actually checks the tables are written as Hudi?
##########
plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkHudiIT.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.quarkus.it;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+import java.io.File;
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.polaris.service.it.env.IntegrationTestsHelper;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+@QuarkusIntegrationTest
+public class SparkHudiIT extends SparkIntegrationBase {
+
+ @Override
+ protected SparkSession.Builder withCatalog(SparkSession.Builder builder,
String catalogName) {
+ return builder
+ .config("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
+ .config(
+ "spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
+ .config(
+ String.format("spark.sql.catalog.%s", catalogName),
+ "org.apache.polaris.spark.SparkCatalog")
+ .config("spark.sql.warehouse.dir", warehouseDir.toString())
+ .config(String.format("spark.sql.catalog.%s.type", catalogName),
"rest")
+ .config(
+ String.format("spark.sql.catalog.%s.uri", catalogName),
+ endpoints.catalogApiEndpoint().toString())
+ .config(String.format("spark.sql.catalog.%s.warehouse", catalogName),
catalogName)
+ .config(String.format("spark.sql.catalog.%s.scope", catalogName),
"PRINCIPAL_ROLE:ALL")
+ .config(
+ String.format("spark.sql.catalog.%s.header.realm", catalogName),
endpoints.realmId())
+ .config(String.format("spark.sql.catalog.%s.token", catalogName),
sparkToken)
+ .config(String.format("spark.sql.catalog.%s.s3.access-key-id",
catalogName), "fakekey")
+ .config(
+ String.format("spark.sql.catalog.%s.s3.secret-access-key",
catalogName), "fakesecret")
+ .config(String.format("spark.sql.catalog.%s.s3.region", catalogName),
"us-west-2")
+ .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
+ .config("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar")
+ // for intial integration test have disabled for now, to revisit
enabling in future
+ .config("hoodie.metadata.enable", "false");
+ }
+
+ private String defaultNs;
+ private String tableRootDir;
+
+ private String getTableLocation(String tableName) {
+ return String.format("%s/%s", tableRootDir, tableName);
+ }
+
+ private String getTableNameWithRandomSuffix() {
+ return generateName("huditb");
+ }
+
+ @BeforeEach
+ public void createDefaultResources(@TempDir Path tempDir) {
+ spark.sparkContext().setLogLevel("INFO");
+ defaultNs = generateName("hudi");
+ // create a default namespace
+ sql("CREATE NAMESPACE %s", defaultNs);
+ sql("USE NAMESPACE %s", defaultNs);
+ tableRootDir =
+
IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve(defaultNs).getPath();
+ }
+
+ @AfterEach
+ public void cleanupHudiData() {
+ // clean up hudi data
+ File dirToDelete = new File(tableRootDir);
+ FileUtils.deleteQuietly(dirToDelete);
+ sql("DROP NAMESPACE %s", defaultNs);
+ }
+
+ @Test
+ public void testBasicTableOperations() {
+ // create a regular hudi table
+ String huditb1 = "huditb1";
+ sql(
+ "CREATE TABLE %s (id INT, name STRING) USING HUDI LOCATION '%s'",
+ huditb1, getTableLocation(huditb1));
+ sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", huditb1);
+ List<Object[]> results = sql("SELECT id,name FROM %s WHERE id > 1 ORDER BY
id DESC", huditb1);
+ assertThat(results.size()).isEqualTo(1);
+ assertThat(results.get(0)).isEqualTo(new Object[] {2, "bob"});
+
+ // create a hudi table with partition
+ String huditb2 = "huditb2";
+ sql(
+ "CREATE TABLE %s (name String, age INT, country STRING) USING HUDI
PARTITIONED BY (country) LOCATION '%s'",
+ huditb2, getTableLocation(huditb2));
+ sql(
+ "INSERT INTO %s VALUES ('anna', 10, 'US'), ('james', 32, 'US'),
('yan', 16, 'CHINA')",
+ huditb2);
+ results = sql("SELECT name, country FROM %s ORDER BY age", huditb2);
+ assertThat(results.size()).isEqualTo(3);
+ assertThat(results.get(0)).isEqualTo(new Object[] {"anna", "US"});
+ assertThat(results.get(1)).isEqualTo(new Object[] {"yan", "CHINA"});
+ assertThat(results.get(2)).isEqualTo(new Object[] {"james", "US"});
+
+ // verify the partition dir is created
+ List<String> subDirs = listDirs(getTableLocation(huditb2));
+ assertThat(subDirs).contains(".hoodie", "country=CHINA", "country=US");
+
+ // test listTables
+ List<Object[]> tables = sql("SHOW TABLES");
+ assertThat(tables.size()).isEqualTo(2);
+ assertThat(tables)
+ .contains(
+ new Object[] {defaultNs, huditb1, false}, new Object[] {defaultNs,
huditb2, false});
+
+ sql("DROP TABLE %s", huditb1);
+ sql("DROP TABLE %s", huditb2);
+ tables = sql("SHOW TABLES");
+ assertThat(tables.size()).isEqualTo(0);
+ }
+
+ @Test
+ public void testUnsupportedAlterTableOperations() {
+ String huditb = getTableNameWithRandomSuffix();
+ sql(
+ "CREATE TABLE %s (name String, age INT, country STRING) USING HUDI
PARTITIONED BY (country) LOCATION '%s'",
+ huditb, getTableLocation(huditb));
+
+ // ALTER TABLE ... RENAME TO ... fails
+ assertThatThrownBy(() -> sql("ALTER TABLE %s RENAME TO new_hudi", huditb))
+ .isInstanceOf(UnsupportedOperationException.class);
+
+ // ALTER TABLE ... SET LOCATION ... fails
+ assertThatThrownBy(() -> sql("ALTER TABLE %s SET LOCATION
'/tmp/new/path'", huditb))
+ .isInstanceOf(UnsupportedOperationException.class);
+
+ sql("DROP TABLE %s", huditb);
+ }
+
+ @Test
+ public void testUnsupportedTableCreateOperations() {
+ String huditb = getTableNameWithRandomSuffix();
+ // create hudi table with no location
+ assertThatThrownBy(() -> sql("CREATE TABLE %s (id INT, name STRING) USING
HUDI", huditb))
Review Comment:
Why should this fail?
##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/HudiCatalogUtils.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.utils;
+
+import java.util.Map;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for Hudi-specific catalog operations, particularly namespace
synchronization
+ * between Polaris catalog and Spark session catalog for Hudi compatibility.
+ *
+ * <p>Hudi table loading requires namespace validation through the session
catalog, but only the
+ * Polaris catalog contains the actual namespace metadata. This class provides
methods to
+ * synchronize namespace operations to maintain consistency between catalogs.
+ */
+public class HudiCatalogUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(HudiCatalogUtils.class);
+
+ /**
+ * Synchronizes namespace creation to session catalog when Hudi extension is
enabled. This ensures
+ * session catalog metadata stays consistent with Polaris catalog for
comprehensive Hudi
Review Comment:
What does `comprehensive` actually mean?
##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java:
##########
@@ -64,9 +92,13 @@ public static boolean
isTableWithSparkManagedLocation(Map<String, String> proper
* Load spark table using DataSourceV2.
*
* @return V2Table if DataSourceV2 is available for the table format. For
delta table, it returns
- * DeltaTableV2.
+ * DeltaTableV2. For hudi it should return HoodieInternalV2Table.
*/
- public static Table loadSparkTable(GenericTable genericTable) {
+ public static Table loadSparkTable(GenericTable genericTable, Identifier
identifier) {
+ if (genericTable.getFormat().equalsIgnoreCase("hudi")) {
+ // hudi does not implement table provider interface, so will need to
catch it
Review Comment:
Catch it?
It's not clear to me why this is different from Delta?
##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/HudiCatalogUtils.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.utils;
+
+import java.util.Map;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for Hudi-specific catalog operations, particularly namespace
synchronization
+ * between Polaris catalog and Spark session catalog for Hudi compatibility.
+ *
+ * <p>Hudi table loading requires namespace validation through the session
catalog, but only the
+ * Polaris catalog contains the actual namespace metadata. This class provides
methods to
+ * synchronize namespace operations to maintain consistency between catalogs.
+ */
+public class HudiCatalogUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(HudiCatalogUtils.class);
+
+ /**
+ * Synchronizes namespace creation to session catalog when Hudi extension is
enabled. This ensures
+ * session catalog metadata stays consistent with Polaris catalog for
comprehensive Hudi
+ * compatibility.
+ *
+ * @param namespace The namespace to create
+ * @param metadata The namespace metadata properties
+ */
+ public static void createNamespace(String[] namespace, Map<String, String>
metadata) {
+ if (!PolarisCatalogUtils.isHudiExtensionEnabled()) {
+ return;
+ }
+
+ // Sync namespace with filtered metadata to session catalog only when Hudi
is enabled
+ // This is needed because Hudi table loading uses the spark session catalog
+ // to validate namespace existence and access metadata properties.
+ // Reserved properties (owner, location, comment) are automatically
filtered out.
+ try {
+ SparkSession spark = SparkSession.active();
+ String ns = String.join(".", namespace);
+
+ // Build CREATE NAMESPACE SQL with metadata properties (filtered for
reserved properties)
+ String createSql = String.format("CREATE NAMESPACE IF NOT EXISTS
spark_catalog.%s", ns);
Review Comment:
This doesn't look correct; why do we need to keep the spark catalog in sync
with the actual Polaris catalog?
##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java:
##########
@@ -68,7 +68,7 @@ public Table loadTable(Identifier identifier) throws
NoSuchTableException {
try {
GenericTable genericTable =
this.polarisCatalog.loadGenericTable(Spark3Util.identifierToTableIdentifier(identifier));
- return PolarisCatalogUtils.loadSparkTable(genericTable);
+ return PolarisCatalogUtils.loadSparkTable(genericTable, identifier);
Review Comment:
It's not clear to me why the interface had to change, is this related to the
namespace thing?
##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java:
##########
@@ -182,7 +194,11 @@ public Table alterTable(Identifier ident, TableChange...
changes) throws NoSuchT
// using ALTER TABLE ...SET LOCATION, and ALTER TABLE ... SET
FILEFORMAT.
TableCatalog deltaCatalog =
deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
return deltaCatalog.alterTable(ident, changes);
+ } else if (PolarisCatalogUtils.useHudi(provider)) {
+ TableCatalog hudiCatalog =
hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);
Review Comment:
The same comment above applies here, right? Let's restructure to make that
more clear.
##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java:
##########
@@ -270,18 +286,24 @@ public Map<String, String> loadNamespaceMetadata(String[]
namespace)
public void createNamespace(String[] namespace, Map<String, String> metadata)
throws NamespaceAlreadyExistsException {
this.icebergsSparkCatalog.createNamespace(namespace, metadata);
+ HudiCatalogUtils.createNamespace(namespace, metadata);
}
@Override
public void alterNamespace(String[] namespace, NamespaceChange... changes)
throws NoSuchNamespaceException {
this.icebergsSparkCatalog.alterNamespace(namespace, changes);
+ HudiCatalogUtils.alterNamespace(namespace, changes);
}
@Override
public boolean dropNamespace(String[] namespace, boolean cascade)
throws NoSuchNamespaceException {
- return this.icebergsSparkCatalog.dropNamespace(namespace, cascade);
+ boolean result = this.icebergsSparkCatalog.dropNamespace(namespace,
cascade);
+ if (result) {
Review Comment:
Why would `icebergsSparkCatalog.dropNamespace` returning `true` mean this is
a Hudi namespace?
##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/HudiCatalogUtils.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.utils;
+
+import java.util.Map;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for Hudi-specific catalog operations, particularly namespace
synchronization
+ * between Polaris catalog and Spark session catalog for Hudi compatibility.
+ *
+ * <p>Hudi table loading requires namespace validation through the session
catalog, but only the
+ * Polaris catalog contains the actual namespace metadata. This class provides
methods to
+ * synchronize namespace operations to maintain consistency between catalogs.
+ */
+public class HudiCatalogUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(HudiCatalogUtils.class);
+
+ /**
+ * Synchronizes namespace creation to session catalog when Hudi extension is
enabled. This ensures
+ * session catalog metadata stays consistent with Polaris catalog for
comprehensive Hudi
+ * compatibility.
+ *
+ * @param namespace The namespace to create
+ * @param metadata The namespace metadata properties
+ */
+ public static void createNamespace(String[] namespace, Map<String, String>
metadata) {
+ if (!PolarisCatalogUtils.isHudiExtensionEnabled()) {
+ return;
+ }
+
+ // Sync namespace with filtered metadata to session catalog only when Hudi
is enabled
+ // This is needed because Hudi table loading uses the spark session catalog
+ // to validate namespace existence and access metadata properties.
+ // Reserved properties (owner, location, comment) are automatically
filtered out.
+ try {
+ SparkSession spark = SparkSession.active();
+ String ns = String.join(".", namespace);
+
+ // Build CREATE NAMESPACE SQL with metadata properties (filtered for
reserved properties)
+ String createSql = String.format("CREATE NAMESPACE IF NOT EXISTS
spark_catalog.%s", ns);
+ String propertiesClause =
PolarisCatalogUtils.formatPropertiesForSQL(metadata);
+ createSql += propertiesClause;
+
+ LOG.debug("Syncing namespace to session catalog with SQL: {}",
createSql);
+ spark.sql(createSql);
+ LOG.debug("Successfully synced namespace {} to session catalog", ns);
+
+ } catch (UnsupportedOperationException e) {
+ String msg =
+ String.format(
+ "Session catalog does not support namespace operations, but Hudi
extension requires "
+ + "namespace synchronization. Cannot create namespace: %s",
+ String.join(".", namespace));
+ LOG.error(msg);
+ throw new RuntimeException(msg, e);
+ } catch (Exception e) {
+ handleNamespaceSyncError(namespace, e);
+ }
+ }
+
+ /**
+ * Synchronizes namespace alterations to session catalog when Hudi extension
is enabled. Applies
+ * both namespace existence and property changes via SQL commands for Hudi
compatibility.
+ *
+ * @param namespace The namespace to alter
+ * @param changes The namespace changes to apply
+ */
+ public static void alterNamespace(String[] namespace, NamespaceChange...
changes) {
+ if (!PolarisCatalogUtils.isHudiExtensionEnabled()) {
+ return;
+ }
+
+ // For Hudi compatibility, sync namespace changes to session catalog
+ // Apply both namespace existence and property changes via SQL commands
+ try {
+ SparkSession spark = SparkSession.active();
+ String ns = String.join(".", namespace);
+
+ // Ensure namespace exists first
+ spark.sql(String.format("CREATE NAMESPACE IF NOT EXISTS
spark_catalog.%s", ns));
+
+ // Apply namespace changes via SQL
+ for (NamespaceChange change : changes) {
+ String sql = PolarisCatalogUtils.convertNamespaceChangeToSQL(ns,
change);
+ if (sql != null) {
+ spark.sql(sql);
+ }
+ }
+ } catch (UnsupportedOperationException e) {
+ String msg =
+ String.format(
+ "Session catalog does not support namespace operations, but Hudi
extension requires "
+ + "namespace synchronization. Cannot alter namespace: %s",
+ String.join(".", namespace));
+ LOG.error(msg);
+ throw new RuntimeException(msg, e);
+ } catch (Exception e) {
+ handleNamespaceSyncError(namespace, "alter", e, false);
+ }
+ }
+
+ /**
+ * Synchronizes namespace drop to session catalog when Hudi extension is
enabled. This maintains
+ * consistency between catalogs for Hudi table operations.
+ *
+ * @param namespace The namespace to drop
+ * @param cascade Whether to cascade the drop operation
+ */
+ public static void dropNamespace(String[] namespace, boolean cascade) {
+ if (!PolarisCatalogUtils.isHudiExtensionEnabled()) {
+ return;
+ }
+
+ // Sync namespace drop to session catalog only when Hudi is enabled
+ // This maintains consistency between catalogs for Hudi table operations
+ try {
+ SparkSession spark = SparkSession.active();
+ String ns = String.join(".", namespace);
+
+ // Build DROP NAMESPACE SQL with CASCADE/RESTRICT flag
+ String cascadeClause = cascade ? " CASCADE" : " RESTRICT";
+ String dropSql =
+ String.format("DROP NAMESPACE IF EXISTS spark_catalog.%s%s", ns,
cascadeClause);
+
+ spark.sql(dropSql);
+ } catch (UnsupportedOperationException e) {
+ String msg =
+ String.format(
+ "Session catalog does not support namespace operations, but Hudi
extension requires "
+ + "namespace synchronization. Cannot drop namespace: %s",
+ String.join(".", namespace));
+ LOG.error(msg);
+ throw new RuntimeException(msg, e);
+ } catch (Exception e) {
+ handleNamespaceSyncError(namespace, "drop", e, false);
+ }
+ }
+
+ /**
+ * Handles namespace synchronization errors with clear error messages and
appropriate actions.
+ *
+ * @param namespace The namespace that failed to sync
+ * @param operation The operation being performed (for logging)
+ * @param e The exception that occurred
+ * @param throwOnError Whether to throw RuntimeException for critical errors
(vs log warning)
+ * @throws RuntimeException For critical errors when throwOnError is true
+ */
+ private static void handleNamespaceSyncError(
+ String[] namespace, String operation, Exception e, boolean throwOnError)
{
+ String errorMsg = e.getMessage();
Review Comment:
Necessity aside, this error handling looks quite brittle. You should
probably be traversing the cause chain and looking at exception types rather
than just doing string-matching on the message.
##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java:
##########
@@ -270,18 +286,24 @@ public Map<String, String> loadNamespaceMetadata(String[]
namespace)
public void createNamespace(String[] namespace, Map<String, String> metadata)
throws NamespaceAlreadyExistsException {
this.icebergsSparkCatalog.createNamespace(namespace, metadata);
+ HudiCatalogUtils.createNamespace(namespace, metadata);
Review Comment:
Why are we _always_ calling this?
##########
plugins/spark/v3.5/spark/build.gradle.kts:
##########
@@ -46,6 +46,47 @@ dependencies {
// TODO: extract a polaris-rest module as a thin layer for
// client to depends on.
implementation(project(":polaris-core")) { isTransitive = false }
+ implementation(project(":polaris-api-iceberg-service")) {
+ // exclude the iceberg dependencies, use the ones pulled
+ // by iceberg-core
+ exclude("org.apache.iceberg", "*")
+ // exclude all cloud and quarkus specific dependencies to avoid
+ // running into problems with signature files.
+ exclude("com.azure", "*")
+ exclude("software.amazon.awssdk", "*")
+ exclude("com.google.cloud", "*")
+ exclude("io.airlift", "*")
+ exclude("io.smallrye", "*")
+ exclude("io.smallrye.common", "*")
+ exclude("io.swagger", "*")
+ exclude("org.apache.commons", "*")
+ }
+ implementation(project(":polaris-api-catalog-service")) {
Review Comment:
ditto, why is this new dep needed for the implementation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]