This is an automated email from the ASF dual-hosted git repository.

dimas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 74383b1e4 test: Add Some Spark Client Tests and Update Documentation 
on Generic Tables (#3152)
74383b1e4 is described below

commit 74383b1e4847236826ed03aa5f8603612b634de0
Author: Adam Christian 
<[email protected]>
AuthorDate: Mon Dec 1 15:20:17 2025 -0500

    test: Add Some Spark Client Tests and Update Documentation on Generic 
Tables (#3152)
---
 plugins/spark/README.md                            |  35 ++-
 .../apache/polaris/spark/utils/DeltaHelper.java    |  13 +
 .../polaris/spark/PolarisRESTCatalogTest.java      | 345 +++++++++++++++++++++
 .../polaris/spark/utils/DeltaHelperTest.java       | 115 +++++++
 .../spark/utils/PolarisCatalogUtilsTest.java       |  93 ++++++
 site/content/in-dev/unreleased/generic-table.md    |  30 +-
 .../in-dev/unreleased/polaris-spark-client.md      |  37 ++-
 7 files changed, 622 insertions(+), 46 deletions(-)

diff --git a/plugins/spark/README.md b/plugins/spark/README.md
index 1bdfe3dd7..a43c9c376 100644
--- a/plugins/spark/README.md
+++ b/plugins/spark/README.md
@@ -21,12 +21,16 @@
 
 The Polaris Spark plugin provides a SparkCatalog class, which communicates 
with the Polaris
 REST endpoints, and provides implementations for Apache Spark's
-[TableCatalog](https://github.com/apache/spark/blob/v3.5.6/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java),
-[ViewCatalog](https://github.com/apache/spark/blob/v3.5.6/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java)
 classes.
-[SupportsNamespaces](https://github.com/apache/spark/blob/v3.5.6/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java),
+- 
[TableCatalog](https://github.com/apache/spark/blob/v3.5.6/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java)
+- 
[ViewCatalog](https://github.com/apache/spark/blob/v3.5.6/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java)
+- 
[SupportsNamespaces](https://github.com/apache/spark/blob/v3.5.6/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java)
 
-Right now, the plugin only provides support for Spark 3.5, Scala version 2.12 
and 2.13,
-and depends on iceberg-spark-runtime 1.9.1.
+Right now, the plugin only provides support for Spark 3.5, Scala version 2.12 
and 2.13, and depends on iceberg-spark-runtime 1.9.1.
+
+The Polaris Spark client supports catalog management for both Iceberg and 
Delta tables. It routes all Iceberg table
+requests to the Iceberg REST endpoints and routes all Delta table requests to 
the Generic Table REST endpoints.
+
+The Spark Client requires at least delta 3.2.1 to work with Delta tables, 
which requires at least Apache Spark 3.5.3.
 
 # Start Spark with local Polaris service using the Polaris Spark plugin
 The following command starts a Polaris server for local testing, it runs on 
localhost:8181 with default
@@ -112,15 +116,16 @@ bin/spark-shell \
 --conf spark.sql.sources.useV1SourceList=''
 ```
 
-# Limitations
-The Polaris Spark client supports catalog management for both Iceberg and 
Delta tables, it routes all Iceberg table
-requests to the Iceberg REST endpoints, and routes all Delta table requests to 
the Generic Table REST endpoints.
+# Current Limitations
+The following describes the current limitations of the Polaris Spark client:
 
-The Spark Client requires at least delta 3.2.1 to work with Delta tables, 
which requires at least Apache Spark 3.5.3.
-Following describes the current functionality limitations of the Polaris Spark 
client:
-1) Create table as select (CTAS) is not supported for Delta tables. As a 
result, the `saveAsTable` method of `Dataframe`
+## General Limitations
+1. The Polaris Spark client only supports Iceberg and Delta tables. It does 
not support other table formats like CSV, JSON, etc.
+2. Generic tables (non-Iceberg tables) do not currently support credential 
vending.
+
+## Delta Table Limitations
+1. Create table as select (CTAS) is not supported for Delta tables. As a 
result, the `saveAsTable` method of `Dataframe`
    is also not supported, since it relies on the CTAS support.
-2) Create a Delta table without explicit location is not supported.
-3) Rename a Delta table is not supported.
-4) ALTER TABLE ... SET LOCATION is not supported for DELTA table.
-5) For other non-Iceberg tables like csv, it is not supported today.
+2. Create a Delta table without explicit location is not supported.
+3. Rename a Delta table is not supported.
+4. ALTER TABLE ... SET LOCATION is not supported for DELTA table.
diff --git 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java
 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java
index 297438424..09316ba12 100644
--- 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java
+++ 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java
@@ -28,6 +28,19 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Helper class for integrating Delta table functionality with Polaris Spark 
Catalog.
+ *
+ * <p>This class is responsible for dynamically loading and configuring a 
Delta Catalog
+ * implementation to work with Polaris. It sets up the Delta Catalog as a 
delegating catalog
+ * extension with Polaris Spark Catalog as the delegate, enabling Delta table 
operations through
+ * Polaris.
+ *
+ * <p>The class uses reflection to configure the Delta Catalog to behave 
identically to Unity
+ * Catalog, as the current Delta Catalog implementation is hardcoded for Unity 
Catalog. This is a
+ * temporary workaround until Delta extends support for other catalog 
implementations (see
+ * https://github.com/delta-io/delta/issues/4306).
+ */
 public class DeltaHelper {
   private static final Logger LOG = LoggerFactory.getLogger(DeltaHelper.class);
 
diff --git 
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/PolarisRESTCatalogTest.java
 
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/PolarisRESTCatalogTest.java
new file mode 100644
index 000000000..ee27dc84e
--- /dev/null
+++ 
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/PolarisRESTCatalogTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.polaris.core.rest.PolarisEndpoints;
+import org.apache.polaris.spark.rest.GenericTable;
+import org.apache.polaris.spark.rest.ListGenericTablesRESTResponse;
+import org.apache.polaris.spark.rest.LoadGenericTableRESTResponse;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+public class PolarisRESTCatalogTest {
+
+  private RESTClient mockClient;
+  private OAuth2Util.AuthSession mockAuthSession;
+  private PolarisRESTCatalog catalog;
+
+  @BeforeEach
+  public void setup() {
+    mockClient = mock(RESTClient.class);
+    mockAuthSession = mock(OAuth2Util.AuthSession.class);
+    
when(mockAuthSession.headers()).thenReturn(ImmutableMap.of("Authorization", 
"Bearer token"));
+    when(mockClient.withAuthSession(any())).thenReturn(mockClient);
+
+    catalog = new PolarisRESTCatalog(config -> mockClient);
+  }
+
+  @Test
+  public void testInitializeWithDefaultEndpoints() {
+    ConfigResponse configResponse =
+        ConfigResponse.builder()
+            .withDefaults(ImmutableMap.of())
+            .withOverrides(ImmutableMap.of())
+            .build();
+
+    when(mockClient.get(any(), anyMap(), eq(ConfigResponse.class), anyMap(), 
any()))
+        .thenReturn(configResponse);
+
+    Map<String, String> properties =
+        ImmutableMap.of(CatalogProperties.URI, "http://localhost:8181";);
+
+    catalog.initialize(properties, mockAuthSession);
+
+    verify(mockClient).get(any(), anyMap(), eq(ConfigResponse.class), 
anyMap(), any());
+  }
+
+  @Test
+  public void testInitializeWithCustomEndpoints() {
+    ConfigResponse configResponse =
+        ConfigResponse.builder()
+            .withDefaults(ImmutableMap.of())
+            .withOverrides(ImmutableMap.of())
+            .withEndpoints(
+                ImmutableList.of(
+                    PolarisEndpoints.V1_LIST_GENERIC_TABLES,
+                    PolarisEndpoints.V1_CREATE_GENERIC_TABLE))
+            .build();
+
+    when(mockClient.get(any(), anyMap(), eq(ConfigResponse.class), anyMap(), 
any()))
+        .thenReturn(configResponse);
+
+    Map<String, String> properties =
+        ImmutableMap.of(CatalogProperties.URI, "http://localhost:8181";);
+
+    catalog.initialize(properties, mockAuthSession);
+
+    verify(mockClient).get(any(), anyMap(), eq(ConfigResponse.class), 
anyMap(), any());
+  }
+
+  @Test
+  public void testInitializeWithPageSize() {
+    ConfigResponse configResponse =
+        ConfigResponse.builder()
+            .withDefaults(ImmutableMap.of())
+            .withOverrides(ImmutableMap.of())
+            .build();
+
+    when(mockClient.get(any(), anyMap(), eq(ConfigResponse.class), anyMap(), 
any()))
+        .thenReturn(configResponse);
+
+    Map<String, String> properties =
+        ImmutableMap.of(
+            CatalogProperties.URI,
+            "http://localhost:8181";,
+            PolarisRESTCatalog.REST_PAGE_SIZE,
+            "10");
+
+    catalog.initialize(properties, mockAuthSession);
+
+    verify(mockClient).get(any(), anyMap(), eq(ConfigResponse.class), 
anyMap(), any());
+  }
+
+  @Test
+  public void testInitializeWithInvalidPageSize() {
+    ConfigResponse configResponse =
+        ConfigResponse.builder()
+            .withDefaults(ImmutableMap.of())
+            .withOverrides(ImmutableMap.of())
+            .build();
+
+    when(mockClient.get(any(), anyMap(), eq(ConfigResponse.class), anyMap(), 
any()))
+        .thenReturn(configResponse);
+
+    Map<String, String> properties =
+        ImmutableMap.of(
+            CatalogProperties.URI,
+            "http://localhost:8181";,
+            PolarisRESTCatalog.REST_PAGE_SIZE,
+            "-1");
+
+    assertThatThrownBy(() -> catalog.initialize(properties, mockAuthSession))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("must be a positive integer");
+  }
+
+  @Test
+  public void testInitializeWithNullConfig() {
+    assertThatThrownBy(() -> catalog.initialize(null, mockAuthSession))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Invalid configuration: null");
+  }
+
+  @Test
+  public void testListGenericTables() {
+    initializeCatalog();
+
+    Namespace namespace = Namespace.of("test_ns");
+    TableIdentifier table1 = TableIdentifier.of(namespace, "table1");
+    TableIdentifier table2 = TableIdentifier.of(namespace, "table2");
+
+    ListGenericTablesRESTResponse response =
+        new ListGenericTablesRESTResponse(null, ImmutableSet.of(table1, 
table2));
+
+    when(mockClient.get(any(), anyMap(), 
eq(ListGenericTablesRESTResponse.class), anyMap(), any()))
+        .thenReturn(response);
+
+    List<TableIdentifier> tables = catalog.listGenericTables(namespace);
+
+    assertThat(tables).hasSize(2);
+    assertThat(tables).contains(table1, table2);
+  }
+
+  @Test
+  public void testListGenericTablesWithPagination() {
+    ConfigResponse configResponse =
+        ConfigResponse.builder()
+            .withDefaults(ImmutableMap.of())
+            .withOverrides(ImmutableMap.of())
+            .build();
+
+    when(mockClient.get(any(), anyMap(), eq(ConfigResponse.class), anyMap(), 
any()))
+        .thenReturn(configResponse);
+
+    Map<String, String> properties =
+        ImmutableMap.of(
+            CatalogProperties.URI, "http://localhost:8181";, 
PolarisRESTCatalog.REST_PAGE_SIZE, "2");
+
+    catalog.initialize(properties, mockAuthSession);
+
+    Namespace namespace = Namespace.of("test_ns");
+    TableIdentifier table1 = TableIdentifier.of(namespace, "table1");
+    TableIdentifier table2 = TableIdentifier.of(namespace, "table2");
+    TableIdentifier table3 = TableIdentifier.of(namespace, "table3");
+
+    ListGenericTablesRESTResponse response1 =
+        new ListGenericTablesRESTResponse("page2", ImmutableSet.of(table1, 
table2));
+    ListGenericTablesRESTResponse response2 =
+        new ListGenericTablesRESTResponse(null, ImmutableSet.of(table3));
+
+    when(mockClient.get(any(), anyMap(), 
eq(ListGenericTablesRESTResponse.class), anyMap(), any()))
+        .thenReturn(response1, response2);
+
+    List<TableIdentifier> tables = catalog.listGenericTables(namespace);
+
+    assertThat(tables).hasSize(3);
+    assertThat(tables).contains(table1, table2, table3);
+  }
+
+  @Test
+  public void testCreateGenericTable() {
+    initializeCatalog();
+
+    TableIdentifier identifier = TableIdentifier.of("test_ns", "test_table");
+    GenericTable table =
+        GenericTable.builder()
+            .setName("test_table")
+            .setFormat("delta")
+            .setBaseLocation("s3://bucket/path")
+            .setDoc("Test table")
+            .setProperties(ImmutableMap.of("key", "value"))
+            .build();
+
+    LoadGenericTableRESTResponse response = new 
LoadGenericTableRESTResponse(table);
+
+    when(mockClient.post(any(), any(), eq(LoadGenericTableRESTResponse.class), 
anyMap(), any()))
+        .thenReturn(response);
+
+    GenericTable result =
+        catalog.createGenericTable(
+            identifier, "delta", "s3://bucket/path", "Test table", 
ImmutableMap.of("key", "value"));
+
+    assertThat(result.getName()).isEqualTo("test_table");
+    assertThat(result.getFormat()).isEqualTo("delta");
+    assertThat(result.getBaseLocation()).isEqualTo("s3://bucket/path");
+  }
+
+  @Test
+  public void testLoadGenericTable() {
+    initializeCatalog();
+
+    TableIdentifier identifier = TableIdentifier.of("test_ns", "test_table");
+    GenericTable table = 
GenericTable.builder().setName("test_table").setFormat("delta").build();
+
+    LoadGenericTableRESTResponse response = new 
LoadGenericTableRESTResponse(table);
+
+    when(mockClient.get(any(), any(), eq(LoadGenericTableRESTResponse.class), 
anyMap(), any()))
+        .thenReturn(response);
+
+    GenericTable result = catalog.loadGenericTable(identifier);
+
+    assertThat(result.getName()).isEqualTo("test_table");
+    assertThat(result.getFormat()).isEqualTo("delta");
+  }
+
+  @Test
+  public void testDropGenericTableSuccess() {
+    initializeCatalog();
+
+    TableIdentifier identifier = TableIdentifier.of("test_ns", "test_table");
+
+    when(mockClient.delete(any(), any(), anyMap(), any())).thenReturn(null);
+
+    boolean result = catalog.dropGenericTable(identifier);
+
+    assertThat(result).isTrue();
+  }
+
+  @Test
+  public void testDropGenericTableNotFound() {
+    initializeCatalog();
+
+    TableIdentifier identifier = TableIdentifier.of("test_ns", "test_table");
+
+    when(mockClient.delete(any(), any(), anyMap(), any()))
+        .thenThrow(new NoSuchTableException("Table not found"));
+
+    boolean result = catalog.dropGenericTable(identifier);
+
+    assertThat(result).isFalse();
+  }
+
+  @Test
+  public void testFetchConfigWithWarehouseLocation() {
+    RESTClient client = mock(RESTClient.class);
+    Map<String, String> headers = ImmutableMap.of("Authorization", "Bearer 
token");
+    Map<String, String> properties =
+        ImmutableMap.of(
+            CatalogProperties.URI,
+            "http://localhost:8181";,
+            CatalogProperties.WAREHOUSE_LOCATION,
+            "s3://warehouse");
+
+    ConfigResponse expectedResponse =
+        ConfigResponse.builder()
+            .withDefaults(ImmutableMap.of())
+            .withOverrides(ImmutableMap.of())
+            .build();
+
+    when(client.get(any(), anyMap(), eq(ConfigResponse.class), anyMap(), 
any()))
+        .thenReturn(expectedResponse);
+
+    ConfigResponse response = PolarisRESTCatalog.fetchConfig(client, headers, 
properties);
+
+    assertThat(response).isNotNull();
+
+    @SuppressWarnings("unchecked")
+    ArgumentCaptor<Map<String, String>> queryParamsCaptor = 
ArgumentCaptor.forClass(Map.class);
+    verify(client)
+        .get(any(), queryParamsCaptor.capture(), eq(ConfigResponse.class), 
anyMap(), any());
+
+    Map<String, String> capturedParams = queryParamsCaptor.getValue();
+    assertThat(capturedParams)
+        .containsEntry(CatalogProperties.WAREHOUSE_LOCATION, "s3://warehouse");
+  }
+
+  private void initializeCatalog() {
+    ConfigResponse configResponse =
+        ConfigResponse.builder()
+            .withDefaults(ImmutableMap.of())
+            .withOverrides(ImmutableMap.of())
+            .withEndpoints(
+                ImmutableList.of(
+                    PolarisEndpoints.V1_LIST_GENERIC_TABLES,
+                    PolarisEndpoints.V1_CREATE_GENERIC_TABLE,
+                    PolarisEndpoints.V1_LOAD_GENERIC_TABLE,
+                    PolarisEndpoints.V1_DELETE_GENERIC_TABLE))
+            .build();
+
+    when(mockClient.get(any(), anyMap(), eq(ConfigResponse.class), anyMap(), 
any()))
+        .thenReturn(configResponse);
+
+    Map<String, String> properties =
+        ImmutableMap.of(CatalogProperties.URI, "http://localhost:8181";);
+
+    catalog.initialize(properties, mockAuthSession);
+  }
+}
diff --git 
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/utils/DeltaHelperTest.java
 
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/utils/DeltaHelperTest.java
new file mode 100644
index 000000000..15a551a43
--- /dev/null
+++ 
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/utils/DeltaHelperTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.utils;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.polaris.spark.NoopDeltaCatalog;
+import org.apache.polaris.spark.PolarisSparkCatalog;
+import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.junit.jupiter.api.Test;
+
+public class DeltaHelperTest {
+
+  @Test
+  public void testLoadDeltaCatalogWithNoopDeltaCatalog() {
+    CaseInsensitiveStringMap options =
+        new CaseInsensitiveStringMap(
+            ImmutableMap.of(
+                DeltaHelper.DELTA_CATALOG_IMPL_KEY, 
"org.apache.polaris.spark.NoopDeltaCatalog"));
+    DeltaHelper helper = new DeltaHelper(options);
+    PolarisSparkCatalog polarisSparkCatalog = mock(PolarisSparkCatalog.class);
+
+    TableCatalog deltaCatalog = helper.loadDeltaCatalog(polarisSparkCatalog);
+
+    assertThat(deltaCatalog).isNotNull();
+    assertThat(deltaCatalog).isInstanceOf(NoopDeltaCatalog.class);
+    assertThat(deltaCatalog).isInstanceOf(DelegatingCatalogExtension.class);
+  }
+
+  @Test
+  public void testLoadDeltaCatalogCachesInstance() {
+    CaseInsensitiveStringMap options =
+        new CaseInsensitiveStringMap(
+            ImmutableMap.of(
+                DeltaHelper.DELTA_CATALOG_IMPL_KEY, 
"org.apache.polaris.spark.NoopDeltaCatalog"));
+    DeltaHelper helper = new DeltaHelper(options);
+    PolarisSparkCatalog polarisSparkCatalog = mock(PolarisSparkCatalog.class);
+
+    TableCatalog deltaCatalog1 = helper.loadDeltaCatalog(polarisSparkCatalog);
+    TableCatalog deltaCatalog2 = helper.loadDeltaCatalog(polarisSparkCatalog);
+
+    // Should return the same cached instance
+    assertThat(deltaCatalog1).isSameAs(deltaCatalog2);
+  }
+
+  @Test
+  public void testLoadDeltaCatalogWithNonExistentClass() {
+    CaseInsensitiveStringMap options =
+        new CaseInsensitiveStringMap(
+            ImmutableMap.of(
+                DeltaHelper.DELTA_CATALOG_IMPL_KEY, 
"com.example.NonExistentDeltaCatalog"));
+    DeltaHelper helper = new DeltaHelper(options);
+    PolarisSparkCatalog polarisSparkCatalog = mock(PolarisSparkCatalog.class);
+
+    assertThatThrownBy(() -> helper.loadDeltaCatalog(polarisSparkCatalog))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot initialize Delta Catalog")
+        .hasMessageContaining("com.example.NonExistentDeltaCatalog");
+  }
+
+  @Test
+  public void testLoadDeltaCatalogSetsIsUnityCatalogField() throws Exception {
+    CaseInsensitiveStringMap options =
+        new CaseInsensitiveStringMap(
+            ImmutableMap.of(
+                DeltaHelper.DELTA_CATALOG_IMPL_KEY, 
"org.apache.polaris.spark.NoopDeltaCatalog"));
+    DeltaHelper helper = new DeltaHelper(options);
+    PolarisSparkCatalog polarisSparkCatalog = mock(PolarisSparkCatalog.class);
+
+    TableCatalog deltaCatalog = helper.loadDeltaCatalog(polarisSparkCatalog);
+
+    // Verify that the isUnityCatalog field is set to true using reflection
+    java.lang.reflect.Field field = 
deltaCatalog.getClass().getDeclaredField("isUnityCatalog");
+    field.setAccessible(true);
+    boolean isUnityCatalog = (boolean) field.get(deltaCatalog);
+
+    assertThat(isUnityCatalog).isTrue();
+  }
+
+  @Test
+  public void testLoadDeltaCatalogWithCaseInsensitiveOptions() {
+    // Test that options are case-insensitive
+    CaseInsensitiveStringMap options =
+        new CaseInsensitiveStringMap(
+            ImmutableMap.of("DELTA-CATALOG-IMPL", 
"org.apache.polaris.spark.NoopDeltaCatalog"));
+    DeltaHelper helper = new DeltaHelper(options);
+    PolarisSparkCatalog polarisSparkCatalog = mock(PolarisSparkCatalog.class);
+
+    TableCatalog deltaCatalog = helper.loadDeltaCatalog(polarisSparkCatalog);
+
+    assertThat(deltaCatalog).isNotNull();
+    assertThat(deltaCatalog).isInstanceOf(NoopDeltaCatalog.class);
+  }
+}
diff --git 
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/utils/PolarisCatalogUtilsTest.java
 
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/utils/PolarisCatalogUtilsTest.java
new file mode 100644
index 000000000..cfe323695
--- /dev/null
+++ 
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/utils/PolarisCatalogUtilsTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.utils;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+public class PolarisCatalogUtilsTest {
+
+  @Test
+  public void testIsTableWithSparkManagedLocationWithNoLocationOrPath() {
+    Map<String, String> properties = ImmutableMap.of("key1", "value1", "key2", 
"value2");
+
+    
assertThat(PolarisCatalogUtils.isTableWithSparkManagedLocation(properties)).isTrue();
+  }
+
+  @Test
+  public void testIsTableWithSparkManagedLocationWithLocation() {
+    Map<String, String> properties =
+        ImmutableMap.of(TableCatalog.PROP_LOCATION, "s3://bucket/path");
+
+    
assertThat(PolarisCatalogUtils.isTableWithSparkManagedLocation(properties)).isFalse();
+  }
+
+  @Test
+  public void testIsTableWithSparkManagedLocationWithPath() {
+    Map<String, String> properties =
+        ImmutableMap.of(PolarisCatalogUtils.TABLE_PATH_KEY, 
"s3://bucket/path");
+
+    
assertThat(PolarisCatalogUtils.isTableWithSparkManagedLocation(properties)).isFalse();
+  }
+
+  @Test
+  public void testIsTableWithSparkManagedLocationWithBothLocationAndPath() {
+    Map<String, String> properties =
+        ImmutableMap.of(
+            TableCatalog.PROP_LOCATION,
+            "s3://bucket/location",
+            PolarisCatalogUtils.TABLE_PATH_KEY,
+            "s3://bucket/path");
+
+    
assertThat(PolarisCatalogUtils.isTableWithSparkManagedLocation(properties)).isFalse();
+  }
+
+  @Test
+  public void testIsTableWithSparkManagedLocationWithEmptyProperties() {
+    Map<String, String> properties = ImmutableMap.of();
+
+    
assertThat(PolarisCatalogUtils.isTableWithSparkManagedLocation(properties)).isTrue();
+  }
+
+  @ParameterizedTest
+  @CsvSource({
+    "parquet, false, false",
+    "csv, false, false",
+    "orc, false, false",
+    "json, false, false",
+    "avro, false, false",
+    "delta, false, true",
+    "iceberg, true, false",
+    "DELTA, false, true",
+    "ICEBERG, true, false",
+    "DeLta, false, true",
+    "IceBerg, true, false"
+  })
+  public void testProviderDetectionForOtherFormats(
+      String provider, boolean expectedIceberg, boolean expectedDelta) {
+    
assertThat(PolarisCatalogUtils.useIceberg(provider)).isEqualTo(expectedIceberg);
+    
assertThat(PolarisCatalogUtils.useDelta(provider)).isEqualTo(expectedDelta);
+  }
+}
diff --git a/site/content/in-dev/unreleased/generic-table.md 
b/site/content/in-dev/unreleased/generic-table.md
index a56824860..e1c9672e1 100644
--- a/site/content/in-dev/unreleased/generic-table.md
+++ b/site/content/in-dev/unreleased/generic-table.md
@@ -22,15 +22,19 @@ type: docs
 weight: 435
 ---
 
-The Generic Table in Apache Polaris is designed to provide support for 
non-Iceberg tables across different table formats includes delta, csv etc. It 
currently provides the following capabilities:
+The generic tables are non-Iceberg tables. Table can be multiple formats 
including Delta, CSV, etc. With this framework, you can:
 - Create a generic table under a namespace
 - Load a generic table
 - Drop a generic table
 - List all generic tables under a namespace
 
+{{< alert important >}}
+Generic tables are in beta. Please use it with caution and report any issue if 
encountered.
+{{< /alert >}}
+
 ## What is a Generic Table?
 
-A generic table in Polaris is an entity that defines the following fields:
+A generic table is an entity that defines the following fields:
 
 - **name** (required): A unique identifier for the table within a namespace
 - **format** (required): The format for the generic table, i.e. "delta", "csv"
@@ -45,8 +49,8 @@ A generic table in Polaris is an entity that defines the 
following fields:
 
 ## Generic Table API Vs. Iceberg Table API
 
-Generic Table provides a different set of APIs to operate on the generic table 
entities while Iceberg APIs operates on
-the Iceberg table entities.
+Polaris provides a set of generic table APIs different from the Iceberg APIs. 
The following table
+shows the comparison between the two APIs:
 
 | Operations   | **Iceberg Table API**                                         
                                                                                
                      | **Generic Table API**                                   
                                                                      |
 
|--------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|
@@ -55,8 +59,7 @@ the Iceberg table entities.
 | Drop Table   | Drop an Iceberg table. Similar as load table, if the table to 
drop is a Generic table, a tableNotFoundException will be thrown.               
                      | Drop a generic table. Drop an Iceberg table through 
Generic table endpoint will thrown an TableNotFound Exception             |
 | List Table   | List all Iceberg tables                                       
                                                                                
                      | List all generic tables                                 
                                                                      |
 
-Note that generic table shares the same namespace with Iceberg tables, the 
table name has to be unique under the same namespace. Furthermore, since
-there is currently no support for Update Generic Table, any update to the 
existing table requires a drop and re-create.
+Note that generic table shares the same namespace with Iceberg tables, the 
table name has to be unique under the same namespace.
 
 ## Working with Generic Table
 
@@ -155,13 +158,10 @@ curl -X DELETE 
http://localhost:8181/api/catalog/polaris/v1/delta_catalog/namesp
 
 For the complete and up-to-date API specification, see the [Catalog API 
Spec](https://editor-next.swagger.io/?url=https://raw.githubusercontent.com/apache/polaris/refs/heads/main/spec/generated/bundled-polaris-catalog-service.yaml).
 
-## Limitations
-
-Current limitations of Generic Table support:
-1) Limited spec information. Currently, there is no spec for information like 
Schema, Partition etc.
-2) No commit coordination or update capability provided at the catalog service 
level.
+## Current Limitations
 
-Therefore, the catalog itself is unaware of anything about the underlying 
table except some of the loosely defined metadata.
-It is the responsibility of the engine (and plugins used by the engine) to 
determine exactly how loading or committing data
-should look like based on the metadata. For example, with the delta support, 
th delta log serialization, deserialization
-and update all happens at client side.
+There are some known limitations for the generic table support:
+1. Generic tables provide limited spec information. For example, there is no 
spec for Schema or Partition.
+2. There is no commit coordination provided by Polaris. It is the 
responsibility of the engine to coordinate loading and committing data. The 
catalog is only aware of the generic table fields above.
+3. There is no update capability provided by Polaris. Any update to a generic 
table must be done through a drop and create.
+4. Generic tables APIs do not support credential vending.
diff --git a/site/content/in-dev/unreleased/polaris-spark-client.md 
b/site/content/in-dev/unreleased/polaris-spark-client.md
index c990e565a..6c3cab754 100644
--- a/site/content/in-dev/unreleased/polaris-spark-client.md
+++ b/site/content/in-dev/unreleased/polaris-spark-client.md
@@ -22,17 +22,17 @@ type: docs
 weight: 650
 ---
 
-Apache Polaris now provides Catalog support for Generic Tables (non-Iceberg 
tables), please check out
-the [Polaris Catalog OpenAPI Spec]({{% ref 
"polaris-api-specs/polaris-catalog-api.md" %}}) for Generic Table API specs.
+Polaris provides a Spark client to manage non-Iceberg tables through [Generic 
Tables]({{% ref "generic-table.md" %}}).
 
-Along with the Generic Table Catalog support, Polaris is also releasing a 
Spark client, which helps to
-provide an end-to-end solution for Apache Spark to manage Delta tables using 
Polaris.
+{{< alert note >}}
+The Spark client can manage Iceberg tables and non-Iceberg tables.
 
-Note the Polaris Spark client is able to handle both Iceberg and Delta tables, 
not just Delta.
+Users who only use Iceberg tables do not need to use this client and can use 
the Iceberg-provided Spark client.
+{{< /alert >}}
 
 This page documents how to connect Spark with Polaris Service using the 
Polaris Spark client.
 
-## Quick Start with Local Polaris service
+## Quick Start with Local Polaris Service
 If you want to quickly try out the functionality with a local Polaris service, 
simply check out the Polaris repo
 and follow the instructions in the Spark plugin getting-started
 
[README](https://github.com/apache/polaris/blob/main/plugins/spark/v3.5/getting-started/README.md).
@@ -42,7 +42,7 @@ Check out the Polaris repo:
 git clone https://github.com/apache/polaris.git ~/polaris
 ```
 
-## Start Spark against a deployed Polaris service
+## Start Spark against a Deployed Polaris Service
 Before starting, ensure that the deployed Polaris service supports Generic 
Tables, and that Spark 3.5(version 3.5.3 or later is installed).
 Spark 3.5.6 is recommended, and you can follow the instructions below to get a 
Spark 3.5.6 distribution.
 ```shell
@@ -53,7 +53,7 @@ tar xzvf spark-3.5.6-bin-hadoop3.tgz -C spark-3.5 
--strip-components=1
 cd spark-3.5
 ```
 
-### Connecting with Spark using the Polaris Spark client
+### Connecting with Spark using the Polaris Spark Client
 The following CLI command can be used to start the Spark with connection to 
the deployed Polaris service using
 a released Polaris Spark client.
 
@@ -102,7 +102,7 @@ spark = SparkSession.builder
 Similar as the CLI command, make sure the corresponding fields are replaced 
correctly.
 
 ### Create tables with Spark
-After Spark is started, you can use it to create and access Iceberg and Delta 
tables, for example:
+After Spark is started, you can use it to create and access Iceberg and Delta 
tables. For example:
 ```python
 spark.sql("USE polaris")
 spark.sql("CREATE NAMESPACE IF NOT EXISTS DELTA_NS")
@@ -119,11 +119,16 @@ If you would like to use a version of the Spark client 
that is currently not yet
 build a Spark client jar locally from source. Please check out the Polaris 
repo and refer to the Spark plugin
 [README](https://github.com/apache/polaris/blob/main/plugins/spark/README.md) 
for detailed instructions.
 
-## Limitations
-The Polaris Spark client has the following functionality limitations:
-1) Create table as select (CTAS) is not supported for Delta tables. As a 
result, the `saveAsTable` method of `Dataframe`
+## Current Limitations
+The following describes the current limitations of the Polaris Spark client:
+
+### General Limitations
+1. The Polaris Spark client only supports Iceberg and Delta tables. It does 
not support other table formats like CSV, JSON, etc.
+2. Generic tables (non-Iceberg tables) APIs do not currently support 
credential vending.
+
+### Delta Table Limitations
+1. Create table as select (CTAS) is not supported for Delta tables. As a 
result, the `saveAsTable` method of `Dataframe`
    is also not supported, since it relies on the CTAS support.
-2) Create a Delta table without explicit location is not supported.
-3) Rename a Delta table is not supported.
-4) ALTER TABLE ... SET LOCATION is not supported for DELTA table.
-5) For other non-Iceberg tables like csv, it is not supported.
+2. Create a Delta table without explicit location is not supported.
+3. Rename a Delta table is not supported.
+4. ALTER TABLE ... SET LOCATION is not supported for DELTA table.
\ No newline at end of file


Reply via email to