This is an automated email from the ASF dual-hosted git repository.

diqiu50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new ff3f385b4f [#11534] fix(spark-connector): GravitinoGlueCatalog does 
not invalidate Iceberg SparkCatalog cache after table mutations (#11559)
ff3f385b4f is described below

commit ff3f385b4fba90ceaea9b57459fa6f55de720b5c
Author: Yuhui <[email protected]>
AuthorDate: Wed Jun 10 21:56:45 2026 +0800

    [#11534] fix(spark-connector): GravitinoGlueCatalog does not invalidate 
Iceberg SparkCatalog cache after table mutations (#11559)
    
    ### What changes were proposed in this pull request?
    
    Override `invalidateTable` in `GravitinoGlueCatalog` to also clear
    `icebergGlueCatalog`'s cache. Change `BaseCatalog` mutation methods to
    call `invalidateTable(ident)` via virtual dispatch so the override takes
    effect.
    
    ### Why are the changes needed?
    
    `icebergGlueCatalog`'s internal `CachingCatalog` was never invalidated
    after table mutations, causing stale schema reads and
    `IllegalStateException: Couldn't find <col> in [...]` after `ALTER TABLE
    ADD COLUMNS`.
    
    Fix #11534
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Stale schema error on Iceberg Glue tables after
    ALTER/DROP/PURGE/RENAME is fixed.
    
    ### How was this patch tested?
    
    - Unit tests in `TestGravitinoGlueCatalog`
    - Integration test `testIcebergAlterTableAddColumnCacheInvalidation` in
    `SparkGlueCatalogIT`, verified against real AWS Glue + S3 (Spark 3.5)
---
 .../spark/connector/catalog/BaseCatalog.java       |  8 ++--
 .../spark/connector/glue/GravitinoGlueCatalog.java | 18 +++++++-
 .../connector/glue/TestGravitinoGlueCatalog.java   | 48 ++++++++++++++++++++++
 .../integration/test/glue/SparkGlueCatalogIT.java  | 32 +++++++++++++++
 4 files changed, 101 insertions(+), 5 deletions(-)

diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
index 6f93518ad5..228ef8a395 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
@@ -279,7 +279,7 @@ public abstract class BaseCatalog implements TableCatalog, 
SupportsNamespaces, F
             .map(sparkTableChangeConverter::toGravitinoTableChange)
             .toArray(org.apache.gravitino.rel.TableChange[]::new);
     try {
-      sparkCatalog.invalidateTable(ident);
+      invalidateTable(ident);
       org.apache.gravitino.rel.Table gravitinoTable =
           gravitinoCatalogClient
               .asTableCatalog()
@@ -301,7 +301,7 @@ public abstract class BaseCatalog implements TableCatalog, 
SupportsNamespaces, F
 
   @Override
   public boolean dropTable(Identifier ident) {
-    sparkCatalog.invalidateTable(ident);
+    invalidateTable(ident);
     return gravitinoCatalogClient
         .asTableCatalog()
         .dropTable(NameIdentifier.of(getDatabase(ident), ident.name()));
@@ -309,7 +309,7 @@ public abstract class BaseCatalog implements TableCatalog, 
SupportsNamespaces, F
 
   @Override
   public boolean purgeTable(Identifier ident) {
-    sparkCatalog.invalidateTable(ident);
+    invalidateTable(ident);
     return gravitinoCatalogClient
         .asTableCatalog()
         .purgeTable(NameIdentifier.of(getDatabase(ident), ident.name()));
@@ -354,7 +354,7 @@ public abstract class BaseCatalog implements TableCatalog, 
SupportsNamespaces, F
     org.apache.gravitino.rel.TableChange rename =
         org.apache.gravitino.rel.TableChange.rename(newIdent.name());
     try {
-      sparkCatalog.invalidateTable(oldIdent);
+      invalidateTable(oldIdent);
       gravitinoCatalogClient
           .asTableCatalog()
           .alterTable(NameIdentifier.of(getDatabase(oldIdent), 
oldIdent.name()), rename);
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
index 47100103c0..dfcef08b0f 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
@@ -69,7 +69,7 @@ public class GravitinoGlueCatalog extends BaseCatalog {
   private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoGlueCatalog.class);
 
   // Lazily initialized Iceberg GlueCatalog for Iceberg tables
-  private volatile SparkCatalog icebergGlueCatalog;
+  volatile SparkCatalog icebergGlueCatalog;
 
   // Store original config for Iceberg catalog initialization
   private String catalogName;
@@ -222,6 +222,22 @@ public class GravitinoGlueCatalog extends BaseCatalog {
     return new SparkHiveTypeConverter();
   }
 
+  /**
+   * Invalidates both the Hive backend and the Iceberg backend caches for the 
given table.
+   *
+   * <p>{@link BaseCatalog} only calls {@code sparkCatalog.invalidateTable}, 
which clears the {@link
+   * HiveTableCatalog} cache. The Iceberg {@link SparkCatalog} maintains its 
own {@code
+   * CachingCatalog} that must be invalidated separately after any table 
mutation (ALTER, DROP,
+   * PURGE, RENAME) to avoid stale schema errors on subsequent reads.
+   */
+  @Override
+  public void invalidateTable(Identifier ident) {
+    super.invalidateTable(ident);
+    if (icebergGlueCatalog != null) {
+      icebergGlueCatalog.invalidateTable(ident);
+    }
+  }
+
   /**
    * Returns true if the Gravitino table is an Iceberg-format table based on 
its properties.
    *
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
index 757bb6510b..37ad819fa5 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
@@ -21,6 +21,8 @@ package org.apache.gravitino.spark.connector.glue;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableMap;
@@ -39,8 +41,10 @@ import 
org.apache.gravitino.spark.connector.PropertiesConverter;
 import org.apache.gravitino.spark.connector.SparkTransformConverter;
 import org.apache.gravitino.spark.connector.SparkTypeConverter;
 import org.apache.gravitino.spark.connector.catalog.GravitinoCatalogManager;
+import org.apache.iceberg.spark.SparkCatalog;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -243,6 +247,50 @@ public class TestGravitinoGlueCatalog {
     Assertions.assertNotNull(transformer);
   }
 
+  // -------------------------------------------------------------------------
+  // Test invalidateTable propagates to icebergGlueCatalog
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testInvalidateTableCallsBothCatalogsWhenIcebergInitialized() throws 
Exception {
+    TableCatalog mockSparkCatalog = mock(TableCatalog.class);
+    SparkCatalog mockIcebergCatalog = mock(SparkCatalog.class);
+    Identifier ident = Identifier.of(new String[] {"db"}, "tbl");
+
+    GravitinoGlueCatalog catalog =
+        new GravitinoGlueCatalog() {
+          {
+            sparkCatalog = mockSparkCatalog;
+            icebergGlueCatalog = mockIcebergCatalog;
+          }
+        };
+
+    catalog.invalidateTable(ident);
+
+    verify(mockSparkCatalog).invalidateTable(ident);
+    verify(mockIcebergCatalog).invalidateTable(ident);
+  }
+
+  @Test
+  void testInvalidateTableCallsOnlySparkCatalogWhenIcebergNotInitialized() 
throws Exception {
+    TableCatalog mockSparkCatalog = mock(TableCatalog.class);
+    SparkCatalog mockIcebergCatalog = mock(SparkCatalog.class);
+    Identifier ident = Identifier.of(new String[] {"db"}, "tbl");
+
+    GravitinoGlueCatalog catalog =
+        new GravitinoGlueCatalog() {
+          {
+            sparkCatalog = mockSparkCatalog;
+            // icebergGlueCatalog left null (not initialized)
+          }
+        };
+
+    catalog.invalidateTable(ident);
+
+    verify(mockSparkCatalog).invalidateTable(ident);
+    verify(mockIcebergCatalog, never()).invalidateTable(any());
+  }
+
   // -------------------------------------------------------------------------
   // Helper methods
   // -------------------------------------------------------------------------
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
index db79faf217..da65d3541c 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
@@ -390,6 +390,38 @@ public abstract class SparkGlueCatalogIT extends 
SparkGlueEnvIT {
     Assertions.assertEquals("1,name1,25", tableData.get(0));
   }
 
+  /**
+   * Regression test for https://github.com/apache/gravitino/issues/11534.
+   *
+   * <p>After ALTER TABLE ADD COLUMNS on an Iceberg table, querying the new 
column must succeed.
+   * Previously, icebergGlueCatalog's CachingCatalog was never invalidated 
after alterTable, causing
+   * {@code IllegalStateException: Couldn't find <newCol> in [<oldCols>]}.
+   */
+  @Test
+  void testIcebergAlterTableAddColumnCacheInvalidation() {
+    String tableName = "test_iceberg_alter_cache";
+    dropTableIfExists(tableName);
+    sql(
+        String.format(
+            "CREATE TABLE %s (id INT COMMENT 'id', name STRING COMMENT 'name') 
USING iceberg",
+            tableName));
+    sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", tableName));
+
+    // Warm up the Iceberg SparkCatalog's CachingCatalog with an initial read
+    List<String> before = getTableData(tableName);
+    Assertions.assertEquals(1, before.size());
+    Assertions.assertEquals("1,Alice", before.get(0));
+
+    // Add a column — must invalidate icebergGlueCatalog cache
+    sql(String.format("ALTER TABLE %s ADD COLUMNS (age INT)", tableName));
+
+    // Before the fix this threw:
+    //   IllegalStateException: Couldn't find age#N in [id#N, name#N]
+    List<String> after = getQueryData(String.format("SELECT id, name, age FROM 
%s", tableName));
+    Assertions.assertEquals(1, after.size());
+    Assertions.assertEquals("1,Alice,NULL", after.get(0));
+  }
+
   @Test
   void testRenameIcebergTable() {
     String oldName = "test_iceberg_rename_old";

Reply via email to