This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new fc981b499a Flink: Log on cache refresh in dynamic sink (#14792)
fc981b499a is described below
commit fc981b499a09cef352f123daa70668d765dfe282
Author: aiborodin <[email protected]>
AuthorDate: Fri Dec 12 05:33:05 2025 +1100
Flink: Log on cache refresh in dynamic sink (#14792)
---
.../flink/sink/dynamic/TableMetadataCache.java | 30 ++++++++++++++++------
1 file changed, 22 insertions(+), 8 deletions(-)
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
index e790d9a929..3be8bbcd91 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
@@ -79,7 +79,7 @@ class TableMetadataCache {
CacheItem cached = tableCache.get(identifier);
if (cached != null && Boolean.TRUE.equals(cached.tableExists)) {
return EXISTS;
- } else if (needsRefresh(cached, true)) {
+ } else if (needsRefresh(identifier, cached, true)) {
return refreshTable(identifier);
} else {
return NOT_EXISTS;
@@ -116,7 +116,7 @@ class TableMetadataCache {
return branch;
}
- if (needsRefresh(cached, allowRefresh)) {
+ if (needsRefresh(identifier, cached, allowRefresh)) {
refreshTable(identifier);
return branch(identifier, branch, false);
} else {
@@ -156,7 +156,7 @@ class TableMetadataCache {
}
}
- if (needsRefresh(cached, allowRefresh)) {
+ if (needsRefresh(identifier, cached, allowRefresh)) {
refreshTable(identifier);
return schema(identifier, input, false, dropUnusedColumns);
} else if (compatible != null) {
@@ -186,7 +186,7 @@ class TableMetadataCache {
}
}
- if (needsRefresh(cached, allowRefresh)) {
+ if (needsRefresh(identifier, cached, allowRefresh)) {
refreshTable(identifier);
return spec(identifier, spec, false);
} else {
@@ -207,10 +207,24 @@ class TableMetadataCache {
}
}
- private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) {
- return allowRefresh
- && (cacheItem == null
- || cacheRefreshClock.millis() - cacheItem.createdTimestampMillis >
refreshMs);
+ private boolean needsRefresh(
+ TableIdentifier identifier, CacheItem cacheItem, boolean allowRefresh) {
+ if (!allowRefresh) {
+ return false;
+ }
+
+ if (cacheItem == null) {
+ return true;
+ }
+
+ long nowMillis = cacheRefreshClock.millis();
+ long timeElapsedMillis = nowMillis - cacheItem.createdTimestampMillis;
+ if (timeElapsedMillis > refreshMs) {
+ LOG.info("Refreshing table metadata for {} after {} millis", identifier,
timeElapsedMillis);
+ return true;
+ }
+
+ return false;
}
public void invalidate(TableIdentifier identifier) {