This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new c4ba60d27b Flink: Backport fix cache refreshing in dynamic sink 
(#14765)
c4ba60d27b is described below

commit c4ba60d27b02d8618621ad701e52d51b9a98d0d5
Author: aiborodin <[email protected]>
AuthorDate: Fri Dec 5 19:08:30 2025 +1100

    Flink: Backport fix cache refreshing in dynamic sink (#14765)
    
    Backport  #14406
---
 .../flink/sink/dynamic/TableMetadataCache.java     | 25 +++++++++++++---
 .../flink/sink/dynamic/TestTableMetadataCache.java | 33 ++++++++++++++++++++++
 .../flink/sink/dynamic/TableMetadataCache.java     | 25 +++++++++++++---
 .../flink/sink/dynamic/TestTableMetadataCache.java | 33 ++++++++++++++++++++++
 4 files changed, 108 insertions(+), 8 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
index 85a5a4abf2..2c08a3486e 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.flink.sink.dynamic;
 
+import java.time.Clock;
 import java.util.Map;
 import java.util.Set;
 import org.apache.flink.annotation.Internal;
@@ -50,13 +51,25 @@ class TableMetadataCache {
 
   private final Catalog catalog;
   private final long refreshMs;
+  private final Clock cacheRefreshClock;
   private final int inputSchemasPerTableCacheMaximumSize;
   private final Map<TableIdentifier, CacheItem> tableCache;
 
   TableMetadataCache(
       Catalog catalog, int maximumSize, long refreshMs, int 
inputSchemasPerTableCacheMaximumSize) {
+    this(catalog, maximumSize, refreshMs, 
inputSchemasPerTableCacheMaximumSize, Clock.systemUTC());
+  }
+
+  @VisibleForTesting
+  TableMetadataCache(
+      Catalog catalog,
+      int maximumSize,
+      long refreshMs,
+      int inputSchemasPerTableCacheMaximumSize,
+      Clock cacheRefreshClock) {
     this.catalog = catalog;
     this.refreshMs = refreshMs;
+    this.cacheRefreshClock = cacheRefreshClock;
     this.inputSchemasPerTableCacheMaximumSize = 
inputSchemasPerTableCacheMaximumSize;
     this.tableCache = new LRUCache<>(maximumSize);
   }
@@ -88,6 +101,7 @@ class TableMetadataCache {
     tableCache.put(
         identifier,
         new CacheItem(
+            cacheRefreshClock.millis(),
             true,
             table.refs().keySet(),
             table.schemas(),
@@ -186,14 +200,16 @@ class TableMetadataCache {
       return EXISTS;
     } catch (NoSuchTableException e) {
       LOG.debug("Table doesn't exist {}", identifier, e);
-      tableCache.put(identifier, new CacheItem(false, null, null, null, 1));
+      tableCache.put(
+          identifier, new CacheItem(cacheRefreshClock.millis(), false, null, 
null, null, 1));
       return Tuple2.of(false, e);
     }
   }
 
   private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) {
     return allowRefresh
-        && (cacheItem == null || cacheItem.created + refreshMs > 
System.currentTimeMillis());
+        && (cacheItem == null
+            || cacheRefreshClock.millis() - cacheItem.createdTimestampMillis > 
refreshMs);
   }
 
   public void invalidate(TableIdentifier identifier) {
@@ -202,8 +218,7 @@ class TableMetadataCache {
 
   /** Handles timeout for missing items only. Caffeine performance causes 
noticeable delays. */
   static class CacheItem {
-    private final long created = System.currentTimeMillis();
-
+    private final long createdTimestampMillis;
     private final boolean tableExists;
     private final Set<String> branches;
     private final Map<Integer, Schema> tableSchemas;
@@ -211,11 +226,13 @@ class TableMetadataCache {
     private final Map<Schema, ResolvedSchemaInfo> inputSchemas;
 
     private CacheItem(
+        long createdTimestampMillis,
         boolean tableExists,
         Set<String> branches,
         Map<Integer, Schema> tableSchemas,
         Map<Integer, PartitionSpec> specs,
         int inputSchemaCacheMaximumSize) {
+      this.createdTimestampMillis = createdTimestampMillis;
       this.tableExists = tableExists;
       this.branches = branches;
       this.tableSchemas = tableSchemas;
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
index 42c93c13c2..bf5b9f562f 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
@@ -20,9 +20,13 @@ package org.apache.iceberg.flink.sink.dynamic;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
@@ -92,4 +96,33 @@ public class TestTableMetadataCache extends 
TestFlinkIcebergSinkBase {
 
     assertThat(cache.getInternalCache()).isEmpty();
   }
+
+  @Test
+  void testNoCacheRefreshingBeforeRefreshIntervalElapses() {
+    // Create table
+    Catalog catalog = CATALOG_EXTENSION.catalog();
+    TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
+    Table table = catalog.createTable(tableIdentifier, SCHEMA2);
+
+    // Init cache
+    TableMetadataCache cache =
+        new TableMetadataCache(
+            catalog, 10, 100L, 10, Clock.fixed(Instant.now(), 
ZoneId.systemDefault()));
+    cache.update(tableIdentifier, table);
+
+    // Cache schema
+    Schema schema = cache.schema(tableIdentifier, 
SCHEMA2).resolvedTableSchema();
+    assertThat(schema.sameSchema(SCHEMA2)).isTrue();
+
+    // Cache schema with fewer fields
+    TableMetadataCache.ResolvedSchemaInfo schemaInfo = 
cache.schema(tableIdentifier, SCHEMA);
+    assertThat(schemaInfo.resolvedTableSchema().sameSchema(SCHEMA2)).isTrue();
+    assertThat(schemaInfo.compareResult())
+        .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
+
+    // Assert both schemas are in cache
+    TableMetadataCache.CacheItem cacheItem = 
cache.getInternalCache().get(tableIdentifier);
+    assertThat(cacheItem).isNotNull();
+    assertThat(cacheItem.inputSchemas()).containsKeys(SCHEMA, SCHEMA2);
+  }
 }
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
index 85a5a4abf2..2c08a3486e 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.flink.sink.dynamic;
 
+import java.time.Clock;
 import java.util.Map;
 import java.util.Set;
 import org.apache.flink.annotation.Internal;
@@ -50,13 +51,25 @@ class TableMetadataCache {
 
   private final Catalog catalog;
   private final long refreshMs;
+  private final Clock cacheRefreshClock;
   private final int inputSchemasPerTableCacheMaximumSize;
   private final Map<TableIdentifier, CacheItem> tableCache;
 
   TableMetadataCache(
       Catalog catalog, int maximumSize, long refreshMs, int 
inputSchemasPerTableCacheMaximumSize) {
+    this(catalog, maximumSize, refreshMs, 
inputSchemasPerTableCacheMaximumSize, Clock.systemUTC());
+  }
+
+  @VisibleForTesting
+  TableMetadataCache(
+      Catalog catalog,
+      int maximumSize,
+      long refreshMs,
+      int inputSchemasPerTableCacheMaximumSize,
+      Clock cacheRefreshClock) {
     this.catalog = catalog;
     this.refreshMs = refreshMs;
+    this.cacheRefreshClock = cacheRefreshClock;
     this.inputSchemasPerTableCacheMaximumSize = 
inputSchemasPerTableCacheMaximumSize;
     this.tableCache = new LRUCache<>(maximumSize);
   }
@@ -88,6 +101,7 @@ class TableMetadataCache {
     tableCache.put(
         identifier,
         new CacheItem(
+            cacheRefreshClock.millis(),
             true,
             table.refs().keySet(),
             table.schemas(),
@@ -186,14 +200,16 @@ class TableMetadataCache {
       return EXISTS;
     } catch (NoSuchTableException e) {
       LOG.debug("Table doesn't exist {}", identifier, e);
-      tableCache.put(identifier, new CacheItem(false, null, null, null, 1));
+      tableCache.put(
+          identifier, new CacheItem(cacheRefreshClock.millis(), false, null, 
null, null, 1));
       return Tuple2.of(false, e);
     }
   }
 
   private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) {
     return allowRefresh
-        && (cacheItem == null || cacheItem.created + refreshMs > 
System.currentTimeMillis());
+        && (cacheItem == null
+            || cacheRefreshClock.millis() - cacheItem.createdTimestampMillis > 
refreshMs);
   }
 
   public void invalidate(TableIdentifier identifier) {
@@ -202,8 +218,7 @@ class TableMetadataCache {
 
   /** Handles timeout for missing items only. Caffeine performance causes 
noticeable delays. */
   static class CacheItem {
-    private final long created = System.currentTimeMillis();
-
+    private final long createdTimestampMillis;
     private final boolean tableExists;
     private final Set<String> branches;
     private final Map<Integer, Schema> tableSchemas;
@@ -211,11 +226,13 @@ class TableMetadataCache {
     private final Map<Schema, ResolvedSchemaInfo> inputSchemas;
 
     private CacheItem(
+        long createdTimestampMillis,
         boolean tableExists,
         Set<String> branches,
         Map<Integer, Schema> tableSchemas,
         Map<Integer, PartitionSpec> specs,
         int inputSchemaCacheMaximumSize) {
+      this.createdTimestampMillis = createdTimestampMillis;
       this.tableExists = tableExists;
       this.branches = branches;
       this.tableSchemas = tableSchemas;
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
index 42c93c13c2..bf5b9f562f 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
@@ -20,9 +20,13 @@ package org.apache.iceberg.flink.sink.dynamic;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
@@ -92,4 +96,33 @@ public class TestTableMetadataCache extends 
TestFlinkIcebergSinkBase {
 
     assertThat(cache.getInternalCache()).isEmpty();
   }
+
+  @Test
+  void testNoCacheRefreshingBeforeRefreshIntervalElapses() {
+    // Create table
+    Catalog catalog = CATALOG_EXTENSION.catalog();
+    TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
+    Table table = catalog.createTable(tableIdentifier, SCHEMA2);
+
+    // Init cache
+    TableMetadataCache cache =
+        new TableMetadataCache(
+            catalog, 10, 100L, 10, Clock.fixed(Instant.now(), 
ZoneId.systemDefault()));
+    cache.update(tableIdentifier, table);
+
+    // Cache schema
+    Schema schema = cache.schema(tableIdentifier, 
SCHEMA2).resolvedTableSchema();
+    assertThat(schema.sameSchema(SCHEMA2)).isTrue();
+
+    // Cache schema with fewer fields
+    TableMetadataCache.ResolvedSchemaInfo schemaInfo = 
cache.schema(tableIdentifier, SCHEMA);
+    assertThat(schemaInfo.resolvedTableSchema().sameSchema(SCHEMA2)).isTrue();
+    assertThat(schemaInfo.compareResult())
+        .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
+
+    // Assert both schemas are in cache
+    TableMetadataCache.CacheItem cacheItem = 
cache.getInternalCache().get(tableIdentifier);
+    assertThat(cacheItem).isNotNull();
+    assertThat(cacheItem.inputSchemas()).containsKeys(SCHEMA, SCHEMA2);
+  }
 }

Reply via email to