This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 8db3d2115d Flink: Fix cache refreshing in dynamic sink (#14406)
8db3d2115d is described below
commit 8db3d2115d948ab8b150cd0ca7ae3fe1a17aa1d3
Author: aiborodin <[email protected]>
AuthorDate: Fri Dec 5 19:07:23 2025 +1100
Flink: Fix cache refreshing in dynamic sink (#14406)
---
.../flink/sink/dynamic/TableMetadataCache.java | 25 +++++++++++++---
.../flink/sink/dynamic/TestTableMetadataCache.java | 33 ++++++++++++++++++++++
2 files changed, 54 insertions(+), 4 deletions(-)
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
index 85a5a4abf2..2c08a3486e 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.sink.dynamic;
+import java.time.Clock;
import java.util.Map;
import java.util.Set;
import org.apache.flink.annotation.Internal;
@@ -50,13 +51,25 @@ class TableMetadataCache {
private final Catalog catalog;
private final long refreshMs;
+ private final Clock cacheRefreshClock;
private final int inputSchemasPerTableCacheMaximumSize;
private final Map<TableIdentifier, CacheItem> tableCache;
TableMetadataCache(
Catalog catalog, int maximumSize, long refreshMs, int
inputSchemasPerTableCacheMaximumSize) {
+ this(catalog, maximumSize, refreshMs,
inputSchemasPerTableCacheMaximumSize, Clock.systemUTC());
+ }
+
+ @VisibleForTesting
+ TableMetadataCache(
+ Catalog catalog,
+ int maximumSize,
+ long refreshMs,
+ int inputSchemasPerTableCacheMaximumSize,
+ Clock cacheRefreshClock) {
this.catalog = catalog;
this.refreshMs = refreshMs;
+ this.cacheRefreshClock = cacheRefreshClock;
this.inputSchemasPerTableCacheMaximumSize =
inputSchemasPerTableCacheMaximumSize;
this.tableCache = new LRUCache<>(maximumSize);
}
@@ -88,6 +101,7 @@ class TableMetadataCache {
tableCache.put(
identifier,
new CacheItem(
+ cacheRefreshClock.millis(),
true,
table.refs().keySet(),
table.schemas(),
@@ -186,14 +200,16 @@ class TableMetadataCache {
return EXISTS;
} catch (NoSuchTableException e) {
LOG.debug("Table doesn't exist {}", identifier, e);
- tableCache.put(identifier, new CacheItem(false, null, null, null, 1));
+ tableCache.put(
+ identifier, new CacheItem(cacheRefreshClock.millis(), false, null,
null, null, 1));
return Tuple2.of(false, e);
}
}
private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) {
return allowRefresh
- && (cacheItem == null || cacheItem.created + refreshMs >
System.currentTimeMillis());
+ && (cacheItem == null
+ || cacheRefreshClock.millis() - cacheItem.createdTimestampMillis >
refreshMs);
}
public void invalidate(TableIdentifier identifier) {
@@ -202,8 +218,7 @@ class TableMetadataCache {
/** Handles timeout for missing items only. Caffeine performance causes
noticeable delays. */
static class CacheItem {
- private final long created = System.currentTimeMillis();
-
+ private final long createdTimestampMillis;
private final boolean tableExists;
private final Set<String> branches;
private final Map<Integer, Schema> tableSchemas;
@@ -211,11 +226,13 @@ class TableMetadataCache {
private final Map<Schema, ResolvedSchemaInfo> inputSchemas;
private CacheItem(
+ long createdTimestampMillis,
boolean tableExists,
Set<String> branches,
Map<Integer, Schema> tableSchemas,
Map<Integer, PartitionSpec> specs,
int inputSchemaCacheMaximumSize) {
+ this.createdTimestampMillis = createdTimestampMillis;
this.tableExists = tableExists;
this.branches = branches;
this.tableSchemas = tableSchemas;
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
index 42c93c13c2..bf5b9f562f 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
@@ -20,9 +20,13 @@ package org.apache.iceberg.flink.sink.dynamic;
import static org.assertj.core.api.Assertions.assertThat;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
@@ -92,4 +96,33 @@ public class TestTableMetadataCache extends
TestFlinkIcebergSinkBase {
assertThat(cache.getInternalCache()).isEmpty();
}
+
+ @Test
+ void testNoCacheRefreshingBeforeRefreshIntervalElapses() {
+ // Create table
+ Catalog catalog = CATALOG_EXTENSION.catalog();
+ TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
+ Table table = catalog.createTable(tableIdentifier, SCHEMA2);
+
+ // Init cache
+ TableMetadataCache cache =
+ new TableMetadataCache(
+ catalog, 10, 100L, 10, Clock.fixed(Instant.now(),
ZoneId.systemDefault()));
+ cache.update(tableIdentifier, table);
+
+ // Cache schema
+ Schema schema = cache.schema(tableIdentifier,
SCHEMA2).resolvedTableSchema();
+ assertThat(schema.sameSchema(SCHEMA2)).isTrue();
+
+ // Cache schema with fewer fields
+ TableMetadataCache.ResolvedSchemaInfo schemaInfo =
cache.schema(tableIdentifier, SCHEMA);
+ assertThat(schemaInfo.resolvedTableSchema().sameSchema(SCHEMA2)).isTrue();
+ assertThat(schemaInfo.compareResult())
+ .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
+
+ // Assert both schemas are in cache
+ TableMetadataCache.CacheItem cacheItem =
cache.getInternalCache().get(tableIdentifier);
+ assertThat(cacheItem).isNotNull();
+ assertThat(cacheItem.inputSchemas()).containsKeys(SCHEMA, SCHEMA2);
+ }
}