This is an automated email from the ASF dual-hosted git repository.

github-actions[bot] pushed a commit to branch cherry-pick-8e93bde3-to-branch-1.3
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit 64b29c3147fd99844498480031f84ae713159df3
Author: Yuhui <[email protected]>
AuthorDate: Thu Jun 11 18:52:28 2026 +0800

    [#11592] fix(spark-connector): Fix INSERT into Glue Iceberg tables with 
time-based partition transforms (#11593)
    
    ### What changes were proposed in this pull request?
    
    Override `loadFunction` in `GravitinoGlueCatalog` to delegate to the
    Iceberg `SparkCatalog`, and add `testIcebergPartitions` to
    `SparkCommonIT` for Glue integration test.
    
    ### Why are the changes needed?
    
    Fix: #11592
    
    `GravitinoGlueCatalog` didn't override `loadFunction`, so Spark couldn't
    resolve Iceberg built-in partition functions (`days`, `hours`, `months`,
    `years`) during INSERT planning.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added `testIcebergPartitions` to `SparkCommonIT` and enabled it in
    `SparkGlueCatalogIT`. Verified against real AWS Glue
    (`SparkAwsGlueCatalogIT35`).
---
 .../spark/connector/glue/GravitinoGlueCatalog.java | 21 ++++++
 .../connector/glue/TestGravitinoGlueCatalog.java   | 40 +++++++++++
 .../connector/integration/test/SparkCommonIT.java  | 82 +++++++++++++++++++++
 .../integration/test/glue/SparkGlueCatalogIT.java  |  5 ++
 .../test/iceberg/SparkIcebergCatalogIT.java        | 84 ++--------------------
 5 files changed, 153 insertions(+), 79 deletions(-)

diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
index dfcef08b0f..6ac600140a 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
@@ -37,10 +37,12 @@ import org.apache.iceberg.spark.SparkCatalog;
 import org.apache.iceberg.spark.source.SparkTable;
 import org.apache.kyuubi.spark.connector.hive.HiveTable;
 import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -222,6 +224,25 @@ public class GravitinoGlueCatalog extends BaseCatalog {
     return new SparkHiveTypeConverter();
   }
 
+  /**
+   * Delegates function lookup to the Iceberg {@link SparkCatalog}, which 
registers Iceberg built-in
+   * partition functions ({@code days}, {@code hours}, {@code months}, {@code 
years}, etc.). Falls
+   * back to the Gravitino function registry for non-Iceberg functions.
+   *
+   * <p>Without this override, {@link 
org.apache.gravitino.spark.connector.catalog.BaseCatalog}
+   * would only query the Gravitino server's function registry, which does not 
include Iceberg
+   * built-ins. This causes {@code INSERT} into Iceberg tables partitioned by 
time-based transforms
+   * to fail with {@code AnalysisException: days(col) is not currently 
supported}.
+   */
+  @Override
+  public UnboundFunction loadFunction(Identifier ident) throws 
NoSuchFunctionException {
+    try {
+      return getOrCreateIcebergGlueCatalog().loadFunction(ident);
+    } catch (NoSuchFunctionException e) {
+      return super.loadFunction(ident);
+    }
+  }
+
   /**
    * Invalidates both the Hive backend and the Iceberg backend caches for the 
given table.
    *
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
index 37ad819fa5..af980f7309 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
@@ -20,6 +20,7 @@
 package org.apache.gravitino.spark.connector.glue;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -42,9 +43,11 @@ import 
org.apache.gravitino.spark.connector.SparkTransformConverter;
 import org.apache.gravitino.spark.connector.SparkTypeConverter;
 import org.apache.gravitino.spark.connector.catalog.GravitinoCatalogManager;
 import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -291,10 +294,47 @@ public class TestGravitinoGlueCatalog {
     verify(mockIcebergCatalog, never()).invalidateTable(any());
   }
 
+  // -------------------------------------------------------------------------
+  // Test loadFunction delegation to Iceberg catalog
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testLoadFunctionDelegatesToIcebergCatalog() throws 
NoSuchFunctionException {
+    SparkCatalog mockIcebergCatalog = mock(SparkCatalog.class);
+    UnboundFunction mockFunction = mock(UnboundFunction.class);
+    Identifier ident = Identifier.of(new String[] {}, "years");
+    when(mockIcebergCatalog.loadFunction(ident)).thenReturn(mockFunction);
+
+    Assertions.assertSame(
+        mockFunction, 
makeGlueCatalogWithIceberg(mockIcebergCatalog).loadFunction(ident));
+  }
+
+  @Test
+  void testLoadFunctionFallsBackToSuperWhenIcebergThrows() throws 
NoSuchFunctionException {
+    SparkCatalog mockIcebergCatalog = mock(SparkCatalog.class);
+    Identifier ident = Identifier.of(new String[] {}, "custom_func");
+    
doThrow(mock(NoSuchFunctionException.class)).when(mockIcebergCatalog).loadFunction(any());
+
+    GravitinoGlueCatalog catalog = 
makeGlueCatalogWithIceberg(mockIcebergCatalog);
+    // When Iceberg throws NoSuchFunctionException, the catch block calls 
super.loadFunction.
+    // In the test environment, super also throws; verify Iceberg was 
consulted first.
+    Assertions.assertThrows(Exception.class, () -> 
catalog.loadFunction(ident));
+    verify(mockIcebergCatalog).loadFunction(ident);
+  }
+
   // -------------------------------------------------------------------------
   // Helper methods
   // -------------------------------------------------------------------------
 
+  /** Creates a GravitinoGlueCatalog with the given Iceberg catalog 
pre-injected. */
+  private GravitinoGlueCatalog makeGlueCatalogWithIceberg(SparkCatalog 
icebergCatalog) {
+    return new GravitinoGlueCatalog() {
+      {
+        icebergGlueCatalog = icebergCatalog;
+      }
+    };
+  }
+
   /** Creates a mock Gravitino Table with the given properties. */
   private Table createMockGravitinoTable(java.util.Map<String, String> 
properties) {
     Table mockTable = mock(Table.class);
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
index 7f637ff78b..b743787498 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -129,6 +130,11 @@ public abstract class SparkCommonIT extends SparkEnvIT {
     return true;
   }
 
+  /** Returns whether this catalog supports Iceberg time-based partition 
transforms. */
+  protected boolean supportsIcebergPartitionTransforms() {
+    return false;
+  }
+
   protected SparkTableInfoChecker getTableInfoChecker() {
     return SparkTableInfoChecker.create();
   }
@@ -1059,4 +1065,80 @@ public abstract class SparkCommonIT extends SparkEnvIT {
     String location = tableInfo.getTableLocation();
     Assertions.assertDoesNotThrow(() -> 
getSparkSession().read().parquet(location).printSchema());
   }
+
+  /**
+   * Returns column definitions for a simple Iceberg table with integer, 
string, and timestamp
+   * columns.
+   */
+  protected List<SparkColumnInfo> getIcebergSimpleTableColumn() {
+    return Arrays.asList(
+        SparkColumnInfo.of("id", DataTypes.IntegerType, "id comment"),
+        SparkColumnInfo.of("name", DataTypes.StringType, ""),
+        SparkColumnInfo.of("ts", DataTypes.TimestampType, null));
+  }
+
+  /** Returns the CREATE TABLE SQL for a simple Iceberg table with id, name, 
and ts columns. */
+  protected String getCreateIcebergSimpleTableString(String tableName) {
+    return String.format(
+        "CREATE TABLE %s (id INT COMMENT 'id comment', name STRING COMMENT '', 
ts TIMESTAMP)",
+        tableName);
+  }
+
+  @Test
+  @EnabledIf("supportsIcebergPartitionTransforms")
+  protected void testIcebergPartitions() {
+    Map<String, String> partitionPaths =
+        ImmutableMap.of(
+            "years", "name=a/name_trunc=a/id_bucket=4/ts_year=2024",
+            "months", "name=a/name_trunc=a/id_bucket=4/ts_month=2024-01",
+            "days", "name=a/name_trunc=a/id_bucket=4/ts_day=2024-01-01",
+            "hours", "name=a/name_trunc=a/id_bucket=4/ts_hour=2024-01-01-12");
+    partitionPaths
+        .keySet()
+        .forEach(
+            func -> {
+              String tableName = 
String.format("test_iceberg_%s_partition_table", func);
+              dropTableIfExists(tableName);
+              sql(
+                  getCreateIcebergSimpleTableString(tableName)
+                      + String.format(
+                          " USING iceberg PARTITIONED BY (name, truncate(1, 
name), bucket(16, id), %s(ts));",
+                          func));
+              SparkTableInfo tableInfo = getTableInfo(tableName);
+              SparkTableInfoChecker checker =
+                  SparkTableInfoChecker.create()
+                      .withName(tableName)
+                      .withColumns(getIcebergSimpleTableColumn())
+                      .withIdentifyPartition(Collections.singletonList("name"))
+                      .withTruncatePartition(1, "name")
+                      .withBucketPartition(16, 
Collections.singletonList("id"));
+              switch (func) {
+                case "years":
+                  checker.withYearPartition("ts");
+                  break;
+                case "months":
+                  checker.withMonthPartition("ts");
+                  break;
+                case "days":
+                  checker.withDayPartition("ts");
+                  break;
+                case "hours":
+                  checker.withHourPartition("ts");
+                  break;
+                default:
+                  throw new IllegalArgumentException("Unsupported partition 
function: " + func);
+              }
+              checker.check(tableInfo);
+
+              sql(
+                  String.format(
+                      "INSERT into %s values(2,'a',cast('2024-01-01 12:00:00' 
as timestamp));",
+                      tableName));
+              List<String> queryResult = getTableData(tableName);
+              Assertions.assertEquals(1, queryResult.size());
+              Assertions.assertEquals("2,a,2024-01-01 12:00:00", 
queryResult.get(0));
+              checkDirExists(
+                  new Path(tableInfo.getTableLocation(), "data/" + 
partitionPaths.get(func)));
+            });
+  }
 }
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
index da65d3541c..c981ab646b 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
@@ -125,6 +125,11 @@ public abstract class SparkGlueCatalogIT extends 
SparkGlueEnvIT {
     return false;
   }
 
+  @Override
+  protected boolean supportsIcebergPartitionTransforms() {
+    return true;
+  }
+
   @BeforeAll
   @Override
   protected void startUp() throws Exception {
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
index bb692ac687..aa3552a933 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
@@ -24,7 +24,6 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -125,6 +124,11 @@ public abstract class SparkIcebergCatalogIT extends 
SparkCommonIT {
     return true;
   }
 
+  @Override
+  protected boolean supportsIcebergPartitionTransforms() {
+    return true;
+  }
+
   @Override
   protected String getTableLocation(SparkTableInfo table) {
     return String.join(File.separator, table.getTableLocation(), "data");
@@ -213,67 +217,6 @@ public abstract class SparkIcebergCatalogIT extends 
SparkCommonIT {
             });
   }
 
-  @Test
-  void testIcebergPartitions() {
-    Map<String, String> partitionPaths = new HashMap<>();
-    partitionPaths.put("years", 
"name=a/name_trunc=a/id_bucket=4/ts_year=2024");
-    partitionPaths.put("months", 
"name=a/name_trunc=a/id_bucket=4/ts_month=2024-01");
-    partitionPaths.put("days", 
"name=a/name_trunc=a/id_bucket=4/ts_day=2024-01-01");
-    partitionPaths.put("hours", 
"name=a/name_trunc=a/id_bucket=4/ts_hour=2024-01-01-12");
-
-    partitionPaths
-        .keySet()
-        .forEach(
-            func -> {
-              String tableName = 
String.format("test_iceberg_%s_partition_table", func);
-              dropTableIfExists(tableName);
-              String createTableSQL = 
getCreateIcebergSimpleTableString(tableName);
-              createTableSQL =
-                  createTableSQL
-                      + String.format(
-                          " PARTITIONED BY (name, truncate(1, name), 
bucket(16, id), %s(ts));",
-                          func);
-              sql(createTableSQL);
-              SparkTableInfo tableInfo = getTableInfo(tableName);
-              SparkTableInfoChecker checker =
-                  SparkTableInfoChecker.create()
-                      .withName(tableName)
-                      .withColumns(getIcebergSimpleTableColumn())
-                      .withIdentifyPartition(Collections.singletonList("name"))
-                      .withTruncatePartition(1, "name")
-                      .withBucketPartition(16, 
Collections.singletonList("id"));
-              switch (func) {
-                case "years":
-                  checker.withYearPartition("ts");
-                  break;
-                case "months":
-                  checker.withMonthPartition("ts");
-                  break;
-                case "days":
-                  checker.withDayPartition("ts");
-                  break;
-                case "hours":
-                  checker.withHourPartition("ts");
-                  break;
-                default:
-                  throw new IllegalArgumentException("UnSupported partition 
function: " + func);
-              }
-              checker.check(tableInfo);
-
-              String insertData =
-                  String.format(
-                      "INSERT into %s values(2,'a',cast('2024-01-01 12:00:00' 
as timestamp));",
-                      tableName);
-              sql(insertData);
-              List<String> queryResult = getTableData(tableName);
-              Assertions.assertEquals(1, queryResult.size());
-              Assertions.assertEquals("2,a,2024-01-01 12:00:00", 
queryResult.get(0));
-              String partitionExpression = partitionPaths.get(func);
-              Path partitionPath = new Path(getTableLocation(tableInfo), 
partitionExpression);
-              checkDirExists(partitionPath);
-            });
-  }
-
   @Test
   void testIcebergMetadataColumns() throws NoSuchTableException {
     testMetadataColumns();
@@ -1173,23 +1116,6 @@ public abstract class SparkIcebergCatalogIT extends 
SparkCommonIT {
     Assertions.assertEquals("1,1,1;1,1,1;1,1,1;1,1,1", String.join(";", 
queryResult));
   }
 
-  private List<SparkTableInfo.SparkColumnInfo> getIcebergSimpleTableColumn() {
-    return Arrays.asList(
-        SparkTableInfo.SparkColumnInfo.of("id", DataTypes.IntegerType, "id 
comment"),
-        SparkTableInfo.SparkColumnInfo.of("name", DataTypes.StringType, ""),
-        SparkTableInfo.SparkColumnInfo.of("ts", DataTypes.TimestampType, 
null));
-  }
-
-  /**
-   * Here we build a new `createIcebergSql` String for creating a table with a 
field of timestamp
-   * type to create the year/month,etc. partitions
-   */
-  private String getCreateIcebergSimpleTableString(String tableName) {
-    return String.format(
-        "CREATE TABLE %s (id INT COMMENT 'id comment', name STRING COMMENT '', 
ts TIMESTAMP)",
-        tableName);
-  }
-
   protected SparkMetadataColumnInfo[] getIcebergMetadataColumns() {
     return new SparkMetadataColumnInfo[] {
       new SparkMetadataColumnInfo("_spec_id", DataTypes.IntegerType, false),

Reply via email to