This is an automated email from the ASF dual-hosted git repository.

yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new b479a2f44 Spark: Initial integration for hudi tables within Polaris  
(#1862)
b479a2f44 is described below

commit b479a2f4491dc002420b34c5b127be4fa1a47394
Author: Rahil C <[email protected]>
AuthorDate: Mon Nov 24 16:04:49 2025 -0800

    Spark: Initial integration for hudi tables within Polaris  (#1862)
---
 .../apache/polaris/spark/PolarisSparkCatalog.java  |  14 +-
 .../org/apache/polaris/spark/SparkCatalog.java     |  15 +-
 .../org/apache/polaris/spark/utils/HudiHelper.java |  71 +++++++++
 .../polaris/spark/utils/PolarisCatalogUtils.java   |  93 ++++++++++--
 .../org/apache/polaris/spark/NoopHudiCatalog.java  |  38 +++++
 .../org/apache/polaris/spark/SparkCatalogTest.java | 165 ++++++++++++++-------
 .../polaris/spark/rest/DeserializationTest.java    |  30 ++--
 7 files changed, 347 insertions(+), 79 deletions(-)

diff --git 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java
 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java
index 771c191c0..36ed872d3 100644
--- 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java
+++ 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java
@@ -71,7 +71,12 @@ public class PolarisSparkCatalog implements TableCatalog {
     try {
       GenericTable genericTable =
           
this.polarisCatalog.loadGenericTable(Spark3Util.identifierToTableIdentifier(identifier));
-      return PolarisCatalogUtils.loadSparkTable(genericTable);
+      // Currently Hudi supports Spark Datasource V1, therefore we return a 
V1Table
+      if (PolarisCatalogUtils.useHudi(genericTable.getFormat())) {
+        return PolarisCatalogUtils.loadV1SparkTable(genericTable, identifier, 
name());
+      } else {
+        return PolarisCatalogUtils.loadV2SparkTable(genericTable);
+      }
     } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
       throw new NoSuchTableException(identifier);
     }
@@ -111,7 +116,12 @@ public class PolarisSparkCatalog implements TableCatalog {
               baseLocation,
               null,
               properties);
-      return PolarisCatalogUtils.loadSparkTable(genericTable);
+      // Currently Hudi supports Spark Datasource V1, therefore we return a 
V1Table
+      if (PolarisCatalogUtils.useHudi(format)) {
+        return PolarisCatalogUtils.loadV1SparkTable(genericTable, identifier, 
name());
+      } else {
+        return PolarisCatalogUtils.loadV2SparkTable(genericTable);
+      }
     } catch (AlreadyExistsException e) {
       throw new TableAlreadyExistsException(identifier);
     }
diff --git 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java
 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java
index 26c1fbbf3..040638a47 100644
--- 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java
+++ 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.rest.auth.OAuth2Util;
 import org.apache.iceberg.spark.SupportsReplaceView;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.polaris.spark.utils.DeltaHelper;
+import org.apache.polaris.spark.utils.HudiHelper;
 import org.apache.polaris.spark.utils.PolarisCatalogUtils;
 import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
@@ -69,6 +70,7 @@ public class SparkCatalog
   @VisibleForTesting protected org.apache.iceberg.spark.SparkCatalog 
icebergsSparkCatalog = null;
   @VisibleForTesting protected PolarisSparkCatalog polarisSparkCatalog = null;
   @VisibleForTesting protected DeltaHelper deltaHelper = null;
+  @VisibleForTesting protected HudiHelper hudiHelper = null;
 
   @Override
   public String name() {
@@ -130,6 +132,7 @@ public class SparkCatalog
     this.catalogName = name;
     initRESTCatalog(name, options);
     this.deltaHelper = new DeltaHelper(options);
+    this.hudiHelper = new HudiHelper(options);
   }
 
   @Override
@@ -154,12 +157,16 @@ public class SparkCatalog
         throw new UnsupportedOperationException(
             "Create table without location key is not supported by Polaris. 
Please provide location or path on table creation.");
       }
-
       if (PolarisCatalogUtils.useDelta(provider)) {
         // For delta table, we load the delta catalog to help dealing with the
         // delta log creation.
         TableCatalog deltaCatalog = 
deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
         return deltaCatalog.createTable(ident, schema, transforms, properties);
+      } else if (PolarisCatalogUtils.useHudi(provider)) {
+        // For creating the hudi table, we load HoodieCatalog
+        // to create the .hoodie folder in cloud storage
+        TableCatalog hudiCatalog = 
hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);
+        return hudiCatalog.createTable(ident, schema, transforms, properties);
       } else {
         return this.polarisSparkCatalog.createTable(ident, schema, transforms, 
properties);
       }
@@ -180,8 +187,12 @@ public class SparkCatalog
         //     using ALTER TABLE ...SET LOCATION, and ALTER TABLE ... SET 
FILEFORMAT.
         TableCatalog deltaCatalog = 
deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
         return deltaCatalog.alterTable(ident, changes);
+      } else if (PolarisCatalogUtils.useHudi(provider)) {
+        TableCatalog hudiCatalog = 
hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);
+        return hudiCatalog.alterTable(ident, changes);
+      } else {
+        return this.polarisSparkCatalog.alterTable(ident);
       }
-      return this.polarisSparkCatalog.alterTable(ident);
     }
   }
 
diff --git 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/HudiHelper.java
 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/HudiHelper.java
new file mode 100644
index 000000000..105bad4c0
--- /dev/null
+++ 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/HudiHelper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.spark.utils;
+
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.polaris.spark.PolarisSparkCatalog;
+import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class HudiHelper {
+  public static final String HUDI_CATALOG_IMPL_KEY = "hudi-catalog-impl";
+  private static final String DEFAULT_HUDI_CATALOG_CLASS =
+      "org.apache.spark.sql.hudi.catalog.HoodieCatalog";
+
+  private TableCatalog hudiCatalog = null;
+  private String hudiCatalogImpl = DEFAULT_HUDI_CATALOG_CLASS;
+
+  public HudiHelper(CaseInsensitiveStringMap options) {
+    if (options.get(HUDI_CATALOG_IMPL_KEY) != null) {
+      this.hudiCatalogImpl = options.get(HUDI_CATALOG_IMPL_KEY);
+    }
+  }
+
+  public TableCatalog loadHudiCatalog(PolarisSparkCatalog polarisSparkCatalog) 
{
+    if (this.hudiCatalog != null) {
+      return this.hudiCatalog;
+    }
+
+    DynConstructors.Ctor<TableCatalog> ctor;
+    try {
+      ctor = 
DynConstructors.builder(TableCatalog.class).impl(hudiCatalogImpl).buildChecked();
+    } catch (NoSuchMethodException e) {
+      throw new IllegalArgumentException(
+          String.format("Cannot initialize Hudi Catalog %s: %s", 
hudiCatalogImpl, e.getMessage()),
+          e);
+    }
+
+    try {
+      this.hudiCatalog = ctor.newInstance();
+    } catch (ClassCastException e) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Cannot initialize Hudi Catalog, %s does not implement 
TableCatalog.",
+              hudiCatalogImpl),
+          e);
+    }
+
+    // set the polaris spark catalog as the delegate catalog of hudi catalog
+    // will be used in HoodieCatalog's loadTable
+    ((DelegatingCatalogExtension) 
this.hudiCatalog).setDelegateCatalog(polarisSparkCatalog);
+    return this.hudiCatalog;
+  }
+}
diff --git 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java
 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java
index 98016b71f..5493f0dc3 100644
--- 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java
+++ 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java
@@ -29,14 +29,20 @@ import org.apache.iceberg.rest.auth.OAuth2Util;
 import org.apache.iceberg.spark.SparkCatalog;
 import org.apache.polaris.spark.rest.GenericTable;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
+import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.catalog.TableProvider;
 import org.apache.spark.sql.execution.datasources.DataSource;
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import scala.Option;
 
 public class PolarisCatalogUtils {
+
   public static final String TABLE_PROVIDER_KEY = "provider";
   public static final String TABLE_PATH_KEY = "path";
 
@@ -50,6 +56,10 @@ public class PolarisCatalogUtils {
     return "delta".equalsIgnoreCase(provider);
   }
 
+  public static boolean useHudi(String provider) {
+    return "hudi".equalsIgnoreCase(provider);
+  }
+
   /**
    * For tables whose location is managed by Spark Session Catalog, there will 
be no location or
    * path in the properties.
@@ -61,16 +71,11 @@ public class PolarisCatalogUtils {
   }
 
   /**
-   * Load spark table using DataSourceV2.
-   *
-   * @return V2Table if DataSourceV2 is available for the table format. For 
delta table, it returns
-   *     DeltaTableV2.
+   * Normalize table properties for loading Spark tables by ensuring the 
TABLE_PATH_KEY is properly
+   * set. DataSourceV2 requires the path property on table loading.
    */
-  public static Table loadSparkTable(GenericTable genericTable) {
-    SparkSession sparkSession = SparkSession.active();
-    TableProvider provider =
-        DataSource.lookupDataSourceV2(genericTable.getFormat(), 
sparkSession.sessionState().conf())
-            .get();
+  private static Map<String, String> normalizeTablePropertiesForLoadSparkTable(
+      GenericTable genericTable) {
     Map<String, String> properties = genericTable.getProperties();
     boolean hasLocationClause = properties.get(TableCatalog.PROP_LOCATION) != 
null;
     boolean hasPathClause = properties.get(TABLE_PATH_KEY) != null;
@@ -87,10 +92,80 @@ public class PolarisCatalogUtils {
         tableProperties.put(TABLE_PATH_KEY, 
properties.get(TableCatalog.PROP_LOCATION));
       }
     }
+    return tableProperties;
+  }
+
+  /**
+   * Load spark table using DataSourceV2.
+   *
+   * @return V2Table if DataSourceV2 is available for the table format. For 
delta table, it returns
+   *     DeltaTableV2.
+   */
+  public static Table loadV2SparkTable(GenericTable genericTable) {
+    SparkSession sparkSession = SparkSession.active();
+    TableProvider provider =
+        DataSource.lookupDataSourceV2(genericTable.getFormat(), 
sparkSession.sessionState().conf())
+            .get();
+    Map<String, String> tableProperties = 
normalizeTablePropertiesForLoadSparkTable(genericTable);
     return DataSourceV2Utils.getTableFromProvider(
         provider, new CaseInsensitiveStringMap(tableProperties), 
scala.Option.empty());
   }
 
+  /**
+   * Return a Spark V1Table for formats that do not use DataSourceV2. 
Currently, this is being used
+   * for Hudi tables
+   */
+  public static Table loadV1SparkTable(
+      GenericTable genericTable, Identifier identifier, String catalogName) {
+    Map<String, String> tableProperties = 
normalizeTablePropertiesForLoadSparkTable(genericTable);
+
+    // Need full identifier in order to construct CatalogTable
+    String namespacePath = String.join(".", identifier.namespace());
+    TableIdentifier tableIdentifier =
+        new TableIdentifier(
+            identifier.name(), Option.apply(namespacePath), 
Option.apply(catalogName));
+
+    scala.collection.immutable.Map<String, String> scalaOptions =
+        (scala.collection.immutable.Map<String, String>)
+            scala.collection.immutable.Map$.MODULE$.apply(
+                
scala.collection.JavaConverters.mapAsScalaMap(tableProperties).toSeq());
+
+    org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat storage =
+        DataSource.buildStorageFormatFromOptions(scalaOptions);
+
+    // Currently Polaris generic table does not contain any schema 
information, partition columns,
+    // stats, etc
+    // for now we will fill the parameters we have from polaris catalog, and 
let underlying client
+    // resolve the rest within its catalog implementation
+    org.apache.spark.sql.types.StructType emptySchema = new 
org.apache.spark.sql.types.StructType();
+    scala.collection.immutable.Seq<String> emptyStringSeq =
+        scala.collection.JavaConverters.asScalaBuffer(new 
java.util.ArrayList<String>()).toList();
+    CatalogTable catalogTable =
+        new CatalogTable(
+            tableIdentifier,
+            CatalogTableType.EXTERNAL(),
+            storage,
+            emptySchema,
+            Option.apply(genericTable.getFormat()),
+            emptyStringSeq,
+            scala.Option.empty(),
+            genericTable.getProperties().get(TableCatalog.PROP_OWNER),
+            System.currentTimeMillis(),
+            -1L,
+            "",
+            scalaOptions,
+            scala.Option.empty(),
+            scala.Option.empty(),
+            scala.Option.empty(),
+            emptyStringSeq,
+            false,
+            true,
+            scala.collection.immutable.Map$.MODULE$.empty(),
+            scala.Option.empty());
+
+    return new org.apache.spark.sql.connector.catalog.V1Table(catalogTable);
+  }
+
   /**
    * Get the catalogAuth field inside the RESTSessionCatalog used by Iceberg 
Spark Catalog use
    * reflection. TODO: Deprecate this function once the iceberg client is 
updated to 1.9.0 to use
diff --git 
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopHudiCatalog.java
 
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopHudiCatalog.java
new file mode 100644
index 000000000..93862ea3c
--- /dev/null
+++ 
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopHudiCatalog.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableChange;
+
+/**
+ * This is a fake hudi catalog class that is used for testing. This class is a 
noop class that
+ * directly passes all calls to the delegate CatalogPlugin configured as part 
of
+ * DelegatingCatalogExtension.
+ */
+public class NoopHudiCatalog extends DelegatingCatalogExtension {
+
+  @Override
+  public Table alterTable(Identifier ident, TableChange... changes) throws 
NoSuchTableException {
+    return super.loadTable(ident);
+  }
+}
diff --git 
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
 
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
index 6aa4a3c08..125c6d1d5 100644
--- 
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
+++ 
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
@@ -35,6 +35,7 @@ import 
org.apache.iceberg.spark.actions.DeleteReachableFilesSparkAction;
 import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.source.SparkTable;
 import org.apache.polaris.spark.utils.DeltaHelper;
+import org.apache.polaris.spark.utils.HudiHelper;
 import org.apache.polaris.spark.utils.PolarisCatalogUtils;
 import org.apache.spark.SparkContext;
 import org.apache.spark.sql.SparkSession;
@@ -58,6 +59,7 @@ import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.internal.SessionState;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -67,6 +69,9 @@ import org.mockito.Mockito;
 import scala.Option;
 
 public class SparkCatalogTest {
+  private static MockedStatic<SparkSession> mockedStaticSparkSession;
+  private static SparkSession mockedSession;
+
   private static class InMemoryIcebergSparkCatalog extends 
org.apache.iceberg.spark.SparkCatalog {
     private PolarisInMemoryCatalog inMemoryCatalog = null;
 
@@ -104,6 +109,7 @@ public class SparkCatalogTest {
       this.polarisSparkCatalog.initialize(name, options);
 
       this.deltaHelper = new DeltaHelper(options);
+      this.hudiHelper = new HudiHelper(options);
     }
   }
 
@@ -122,25 +128,50 @@ public class SparkCatalogTest {
     catalogConfig.put("cache-enabled", "false");
     catalogConfig.put(
         DeltaHelper.DELTA_CATALOG_IMPL_KEY, 
"org.apache.polaris.spark.NoopDeltaCatalog");
+    catalogConfig.put(HudiHelper.HUDI_CATALOG_IMPL_KEY, 
"org.apache.polaris.spark.NoopHudiCatalog");
     catalog = new InMemorySparkCatalog();
     Configuration conf = new Configuration();
-    try (MockedStatic<SparkSession> mockedStaticSparkSession =
-            Mockito.mockStatic(SparkSession.class);
-        MockedStatic<SparkUtil> mockedSparkUtil = 
Mockito.mockStatic(SparkUtil.class)) {
-      SparkSession mockedSession = Mockito.mock(SparkSession.class);
-      
mockedStaticSparkSession.when(SparkSession::active).thenReturn(mockedSession);
+
+    // Setup persistent SparkSession mock
+    mockedStaticSparkSession = Mockito.mockStatic(SparkSession.class);
+    mockedSession = Mockito.mock(SparkSession.class);
+    org.apache.spark.sql.RuntimeConfig mockedConfig =
+        Mockito.mock(org.apache.spark.sql.RuntimeConfig.class);
+    SparkContext mockedContext = Mockito.mock(SparkContext.class);
+    SessionState mockedSessionState = Mockito.mock(SessionState.class);
+    SQLConf mockedSQLConf = Mockito.mock(SQLConf.class);
+
+    
mockedStaticSparkSession.when(SparkSession::active).thenReturn(mockedSession);
+    Mockito.when(mockedSession.conf()).thenReturn(mockedConfig);
+    Mockito.when(mockedSession.sessionState()).thenReturn(mockedSessionState);
+    Mockito.when(mockedSessionState.conf()).thenReturn(mockedSQLConf);
+    Mockito.when(mockedConfig.get("spark.sql.extensions", null))
+        .thenReturn(
+            
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,"
+                + "io.delta.sql.DeltaSparkSessionExtension"
+                + "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
+    Mockito.when(mockedConfig.get("spark.sql.warehouse.dir", 
"spark-warehouse"))
+        .thenReturn("/tmp/test-warehouse");
+    Mockito.when(mockedSession.sparkContext()).thenReturn(mockedContext);
+    Mockito.when(mockedContext.applicationId()).thenReturn("appId");
+    Mockito.when(mockedContext.sparkUser()).thenReturn("test-user");
+    Mockito.when(mockedContext.version()).thenReturn("3.5");
+
+    try (MockedStatic<SparkUtil> mockedSparkUtil = 
Mockito.mockStatic(SparkUtil.class)) {
       mockedSparkUtil
           .when(() -> SparkUtil.hadoopConfCatalogOverrides(mockedSession, 
catalogName))
           .thenReturn(conf);
-      SparkContext mockedContext = Mockito.mock(SparkContext.class);
-      Mockito.when(mockedSession.sparkContext()).thenReturn(mockedContext);
-      Mockito.when(mockedContext.applicationId()).thenReturn("appId");
-      Mockito.when(mockedContext.sparkUser()).thenReturn("test-user");
-      Mockito.when(mockedContext.version()).thenReturn("3.5");
 
       catalog.initialize(catalogName, new 
CaseInsensitiveStringMap(catalogConfig));
+      catalog.createNamespace(defaultNS, Maps.newHashMap());
+    }
+  }
+
+  @AfterEach
+  public void tearDown() {
+    if (mockedStaticSparkSession != null) {
+      mockedStaticSparkSession.close();
     }
-    catalog.createNamespace(defaultNS, Maps.newHashMap());
   }
 
   @Test
@@ -402,7 +433,7 @@ public class SparkCatalogTest {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = {"delta", "csv"})
+  @ValueSource(strings = {"delta", "hudi", "csv"})
   void testCreateAndLoadGenericTable(String format) throws Exception {
     Identifier identifier = Identifier.of(defaultNS, "generic-test-table");
     createAndValidateGenericTableWithLoad(catalog, identifier, defaultSchema, 
format);
@@ -418,7 +449,6 @@ public class SparkCatalogTest {
             () -> catalog.createTable(identifier, defaultSchema, new 
Transform[0], newProperties))
         .isInstanceOf(TableAlreadyExistsException.class);
 
-    // drop the iceberg table
     catalog.dropTable(identifier);
     assertThatThrownBy(() -> catalog.loadTable(identifier))
         .isInstanceOf(NoSuchTableException.class);
@@ -428,8 +458,9 @@ public class SparkCatalogTest {
   @Test
   void testMixedTables() throws Exception {
     // create two iceberg tables, and three non-iceberg tables
-    String[] tableNames = new String[] {"iceberg1", "iceberg2", "delta1", 
"csv1", "delta2"};
-    String[] tableFormats = new String[] {"iceberg", null, "delta", "csv", 
"delta"};
+    String[] tableNames =
+        new String[] {"iceberg1", "iceberg2", "delta1", "csv1", "delta2", 
"hudi1", "hudi2"};
+    String[] tableFormats = new String[] {"iceberg", null, "delta", "csv", 
"delta", "hudi", "hudi"};
     for (int i = 0; i < tableNames.length; i++) {
       Identifier identifier = Identifier.of(defaultNS, tableNames[i]);
       createAndValidateGenericTableWithLoad(catalog, identifier, 
defaultSchema, tableFormats[i]);
@@ -445,8 +476,9 @@ public class SparkCatalogTest {
     // drop iceberg2 and delta1 table
     catalog.dropTable(Identifier.of(defaultNS, "iceberg2"));
     catalog.dropTable(Identifier.of(defaultNS, "delta2"));
+    catalog.dropTable(Identifier.of(defaultNS, "hudi2"));
 
-    String[] remainingTableNames = new String[] {"iceberg1", "delta1", "csv1"};
+    String[] remainingTableNames = new String[] {"iceberg1", "delta1", "csv1", 
"hudi1"};
     Identifier[] remainingTableIndents = catalog.listTables(defaultNS);
     
assertThat(remainingTableIndents.length).isEqualTo(remainingTableNames.length);
     for (String name : remainingTableNames) {
@@ -465,12 +497,15 @@ public class SparkCatalogTest {
     String icebergTableName = "iceberg-table";
     String deltaTableName = "delta-table";
     String csvTableName = "csv-table";
+    String hudiTableName = "hudi-table";
     Identifier icebergIdent = Identifier.of(defaultNS, icebergTableName);
     Identifier deltaIdent = Identifier.of(defaultNS, deltaTableName);
     Identifier csvIdent = Identifier.of(defaultNS, csvTableName);
+    Identifier hudiIdent = Identifier.of(defaultNS, hudiTableName);
     createAndValidateGenericTableWithLoad(catalog, icebergIdent, 
defaultSchema, "iceberg");
     createAndValidateGenericTableWithLoad(catalog, deltaIdent, defaultSchema, 
"delta");
     createAndValidateGenericTableWithLoad(catalog, csvIdent, defaultSchema, 
"csv");
+    createAndValidateGenericTableWithLoad(catalog, hudiIdent, defaultSchema, 
"hudi");
 
     // verify alter iceberg table
     Table newIcebergTable =
@@ -488,17 +523,18 @@ public class SparkCatalogTest {
 
     // verify alter delta table is a no-op, and alter csv table throws an 
exception
     SQLConf conf = new SQLConf();
-    try (MockedStatic<SparkSession> mockedStaticSparkSession =
-            Mockito.mockStatic(SparkSession.class);
-        MockedStatic<DataSource> mockedStaticDS = 
Mockito.mockStatic(DataSource.class);
+    try (MockedStatic<DataSource> mockedStaticDS = 
Mockito.mockStatic(DataSource.class);
         MockedStatic<DataSourceV2Utils> mockedStaticDSV2 =
             Mockito.mockStatic(DataSourceV2Utils.class)) {
-      SparkSession mockedSession = Mockito.mock(SparkSession.class);
-      
mockedStaticSparkSession.when(SparkSession::active).thenReturn(mockedSession);
       SessionState mockedState = Mockito.mock(SessionState.class);
       Mockito.when(mockedSession.sessionState()).thenReturn(mockedState);
       Mockito.when(mockedState.conf()).thenReturn(conf);
 
+      // Mock SessionCatalog for Hudi support
+      org.apache.spark.sql.catalyst.catalog.SessionCatalog 
mockedSessionCatalog =
+          
Mockito.mock(org.apache.spark.sql.catalyst.catalog.SessionCatalog.class);
+      Mockito.when(mockedState.catalog()).thenReturn(mockedSessionCatalog);
+
       TableProvider deltaProvider = Mockito.mock(TableProvider.class);
       mockedStaticDS
           .when(() -> DataSource.lookupDataSourceV2(Mockito.eq("delta"), 
Mockito.any()))
@@ -551,18 +587,21 @@ public class SparkCatalogTest {
   void testPurgeInvalidateTable() throws Exception {
     Identifier icebergIdent = Identifier.of(defaultNS, "iceberg-table");
     Identifier deltaIdent = Identifier.of(defaultNS, "delta-table");
+    Identifier hudiIdent = Identifier.of(defaultNS, "hudi-table");
     createAndValidateGenericTableWithLoad(catalog, icebergIdent, 
defaultSchema, "iceberg");
     createAndValidateGenericTableWithLoad(catalog, deltaIdent, defaultSchema, 
"delta");
-
+    createAndValidateGenericTableWithLoad(catalog, hudiIdent, defaultSchema, 
"hudi");
     // test invalidate table is a no op today
     catalog.invalidateTable(icebergIdent);
     catalog.invalidateTable(deltaIdent);
+    catalog.invalidateTable(hudiIdent);
 
     Identifier[] tableIdents = catalog.listTables(defaultNS);
-    assertThat(tableIdents.length).isEqualTo(2);
+    assertThat(tableIdents.length).isEqualTo(3);
 
     // verify purge tables drops the table
     catalog.purgeTable(deltaIdent);
+    catalog.purgeTable(hudiIdent);
     assertThat(catalog.listTables(defaultNS).length).isEqualTo(1);
 
     // purge iceberg table triggers file deletion
@@ -588,42 +627,60 @@ public class SparkCatalogTest {
     properties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, format);
     properties.put(
         TableCatalog.PROP_LOCATION,
-        String.format("file:///tmp/delta/path/to/table/%s/", 
identifier.name()));
-
-    SQLConf conf = new SQLConf();
-    try (MockedStatic<SparkSession> mockedStaticSparkSession =
-            Mockito.mockStatic(SparkSession.class);
-        MockedStatic<DataSource> mockedStaticDS = 
Mockito.mockStatic(DataSource.class);
-        MockedStatic<DataSourceV2Utils> mockedStaticDSV2 =
-            Mockito.mockStatic(DataSourceV2Utils.class)) {
-      SparkSession mockedSession = Mockito.mock(SparkSession.class);
-      
mockedStaticSparkSession.when(SparkSession::active).thenReturn(mockedSession);
-      SessionState mockedState = Mockito.mock(SessionState.class);
-      Mockito.when(mockedSession.sessionState()).thenReturn(mockedState);
-      Mockito.when(mockedState.conf()).thenReturn(conf);
+        String.format("file:///tmp/%s/path/to/table/%s/", format, 
identifier.name()));
 
-      TableProvider provider = Mockito.mock(TableProvider.class);
-      mockedStaticDS
-          .when(() -> DataSource.lookupDataSourceV2(Mockito.eq(format), 
Mockito.any()))
-          .thenReturn(Option.apply(provider));
-      V1Table table = Mockito.mock(V1Table.class);
-      mockedStaticDSV2
-          .when(
-              () ->
-                  DataSourceV2Utils.getTableFromProvider(
-                      Mockito.eq(provider), Mockito.any(), Mockito.any()))
-          .thenReturn(table);
+    if (PolarisCatalogUtils.useIceberg(format)) {
       Table createdTable =
           sparkCatalog.createTable(identifier, schema, new Transform[0], 
properties);
       Table loadedTable = sparkCatalog.loadTable(identifier);
 
-      // verify the create and load table result
-      if (PolarisCatalogUtils.useIceberg(format)) {
-        // iceberg SparkTable is returned for iceberg tables
-        assertThat(createdTable).isInstanceOf(SparkTable.class);
-        assertThat(loadedTable).isInstanceOf(SparkTable.class);
-      } else {
-        // Spark V1 table is returned for non-iceberg tables
+      // verify iceberg SparkTable is returned for iceberg tables
+      assertThat(createdTable).isInstanceOf(SparkTable.class);
+      assertThat(loadedTable).isInstanceOf(SparkTable.class);
+    } else {
+      // For non-Iceberg tables, use mocking
+      try (MockedStatic<DataSource> mockedStaticDS = 
Mockito.mockStatic(DataSource.class);
+          MockedStatic<DataSourceV2Utils> mockedStaticDSV2 =
+              Mockito.mockStatic(DataSourceV2Utils.class);
+          MockedStatic<PolarisCatalogUtils> mockedStaticUtils =
+              Mockito.mockStatic(PolarisCatalogUtils.class)) {
+
+        V1Table table = Mockito.mock(V1Table.class);
+
+        // Mock the routing utility methods
+        mockedStaticUtils
+            .when(() -> PolarisCatalogUtils.useHudi(Mockito.eq(format)))
+            .thenCallRealMethod();
+
+        if ("hudi".equalsIgnoreCase(format)) {
+          // For Hudi tables, mock the loadV1SparkHudiTable method to return 
the mock table
+          mockedStaticUtils
+              .when(
+                  () ->
+                      PolarisCatalogUtils.loadV1SparkTable(
+                          Mockito.any(), Mockito.any(), Mockito.any()))
+              .thenReturn(table);
+        } else {
+          TableProvider provider = Mockito.mock(TableProvider.class);
+          mockedStaticDS
+              .when(() -> DataSource.lookupDataSourceV2(Mockito.eq(format), 
Mockito.any()))
+              .thenReturn(Option.apply(provider));
+          mockedStaticDSV2
+              .when(
+                  () ->
+                      DataSourceV2Utils.getTableFromProvider(
+                          Mockito.eq(provider), Mockito.any(), Mockito.any()))
+              .thenReturn(table);
+          mockedStaticUtils
+              .when(() -> PolarisCatalogUtils.loadV2SparkTable(Mockito.any()))
+              .thenCallRealMethod();
+        }
+
+        Table createdTable =
+            sparkCatalog.createTable(identifier, schema, new Transform[0], 
properties);
+        Table loadedTable = sparkCatalog.loadTable(identifier);
+
+        // verify Spark V1 table is returned for non-iceberg tables
         assertThat(createdTable).isInstanceOf(V1Table.class);
         assertThat(loadedTable).isInstanceOf(V1Table.class);
       }
diff --git 
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java
 
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java
index 0f7d3c99b..6c2bb99dc 100644
--- 
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java
+++ 
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java
@@ -22,6 +22,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Stream;
@@ -66,11 +69,11 @@ public class DeserializationTest {
   @ParameterizedTest
   @MethodSource("genericTableTestCases")
   public void testLoadGenericTableRESTResponse(
-      String baseLocation, String doc, Map<String, String> properties)
+      String baseLocation, String doc, Map<String, String> properties, String 
format)
       throws JsonProcessingException {
     GenericTable.Builder tableBuilder =
         GenericTable.builder()
-            .setFormat("delta")
+            .setFormat(format)
             .setName("test-table")
             .setProperties(properties)
             .setDoc(doc);
@@ -82,7 +85,7 @@ public class DeserializationTest {
     String json = mapper.writeValueAsString(response);
     LoadGenericTableRESTResponse deserializedResponse =
         mapper.readValue(json, LoadGenericTableRESTResponse.class);
-    assertThat(deserializedResponse.getTable().getFormat()).isEqualTo("delta");
+    assertThat(deserializedResponse.getTable().getFormat()).isEqualTo(format);
     
assertThat(deserializedResponse.getTable().getName()).isEqualTo("test-table");
     assertThat(deserializedResponse.getTable().getDoc()).isEqualTo(doc);
     
assertThat(deserializedResponse.getTable().getProperties().size()).isEqualTo(properties.size());
@@ -92,13 +95,13 @@ public class DeserializationTest {
   @ParameterizedTest
   @MethodSource("genericTableTestCases")
   public void testCreateGenericTableRESTRequest(
-      String baseLocation, String doc, Map<String, String> properties)
+      String baseLocation, String doc, Map<String, String> properties, String 
format)
       throws JsonProcessingException {
     CreateGenericTableRESTRequest request =
         new CreateGenericTableRESTRequest(
             CreateGenericTableRequest.builder()
                 .setName("test-table")
-                .setFormat("delta")
+                .setFormat(format)
                 .setDoc(doc)
                 .setBaseLocation(baseLocation)
                 .setProperties(properties)
@@ -107,7 +110,7 @@ public class DeserializationTest {
     CreateGenericTableRESTRequest deserializedRequest =
         mapper.readValue(json, CreateGenericTableRESTRequest.class);
     assertThat(deserializedRequest.getName()).isEqualTo("test-table");
-    assertThat(deserializedRequest.getFormat()).isEqualTo("delta");
+    assertThat(deserializedRequest.getFormat()).isEqualTo(format);
     assertThat(deserializedRequest.getDoc()).isEqualTo(doc);
     
assertThat(deserializedRequest.getProperties().size()).isEqualTo(properties.size());
     assertThat(deserializedRequest.getBaseLocation()).isEqualTo(baseLocation);
@@ -159,11 +162,14 @@ public class DeserializationTest {
     var properties = Maps.newHashMap();
     properties.put("location", "s3://path/to/table/");
     var baseLocation = "s3://path/to/table/";
-    return Stream.of(
-        Arguments.of(null, doc, properties),
-        Arguments.of(baseLocation, doc, properties),
-        Arguments.of(null, null, Maps.newHashMap()),
-        Arguments.of(baseLocation, doc, Maps.newHashMap()),
-        Arguments.of(baseLocation, null, properties));
+    List<Arguments> args = new ArrayList<>();
+    for (String format : Arrays.asList("delta", "hudi")) {
+      args.add(Arguments.of(null, doc, properties, format));
+      args.add(Arguments.of(baseLocation, doc, properties, format));
+      args.add(Arguments.of(null, null, Maps.newHashMap(), format));
+      args.add(Arguments.of(baseLocation, doc, Maps.newHashMap(), format));
+      args.add(Arguments.of(baseLocation, null, properties, format));
+    }
+    return args.stream();
   }
 }


Reply via email to