This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 22f866687a Flink: Backport PR #16324 to v2.0 and v1.20 (#16338)
22f866687a is described below
commit 22f866687a7cae6b1475781998583f7da089b9f4
Author: Kevin Liu <[email protected]>
AuthorDate: Thu May 14 15:10:55 2026 -0400
Flink: Backport PR #16324 to v2.0 and v1.20 (#16338)
Agent-Logs-Url:
https://github.com/kevinjqliu/iceberg/sessions/682ce8b4-890f-41a9-a89a-b1f2873be44c
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: kevinjqliu <[email protected]>
---
.../maintenance/operator/ListMetadataFiles.java | 1 +
.../operator/TestListMetadataFiles.java | 35 ++++++++++++++++++++++
.../maintenance/operator/ListMetadataFiles.java | 1 +
.../operator/TestListMetadataFiles.java | 35 ++++++++++++++++++++++
4 files changed, 72 insertions(+)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ListMetadataFiles.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ListMetadataFiles.java
index 3ae42c6083..f9000511c1 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ListMetadataFiles.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ListMetadataFiles.java
@@ -66,6 +66,7 @@ public class ListMetadataFiles extends
ProcessFunction<Trigger, String> {
public void processElement(Trigger trigger, Context ctx, Collector<String>
collector)
throws Exception {
try {
+ table.refresh();
table
.snapshots()
.forEach(
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java
index bb8c74f3d5..8614c634f1 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java
@@ -23,8 +23,10 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.Test;
class TestListMetadataFiles extends OperatorTestBase {
@@ -87,4 +89,37 @@ class TestListMetadataFiles extends OperatorTestBase {
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
}
}
+
+ @Test
+ void testMetadataFilesIncludesSnapshotsAddedAfterOpen() throws Exception {
+ Table table = createTable();
+ insert(table, 1, "a");
+
+ try (OneInputStreamOperatorTestHarness<Trigger, String> testHarness =
+ ProcessFunctionTestHarnesses.forProcessFunction(
+ new ListMetadataFiles(OperatorTestBase.DUMMY_TABLE_NAME, 0,
tableLoader()))) {
+ testHarness.open();
+
+ // Add more snapshots AFTER the operator has been opened
+ insert(table, 2, "b");
+ insert(table, 3, "c");
+
+ OperatorTestBase.trigger(testHarness);
+
+ List<String> tableMetadataFiles = testHarness.extractOutputValues();
+
+ // Verify that manifest lists from ALL 3 snapshots are present, not just
the first one.
+ // Without table.refresh() in processElement, only snapshot 1's files
would be emitted.
+ table.refresh();
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+ assertThat(snapshots).hasSize(3);
+ for (Snapshot snapshot : snapshots) {
+
assertThat(tableMetadataFiles).contains(snapshot.manifestListLocation());
+ }
+ // Verify total count matches what 3 snapshots should produce
+ assertThat(tableMetadataFiles).hasSize(24);
+
+
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
+ }
+ }
}
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ListMetadataFiles.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ListMetadataFiles.java
index 3ae42c6083..f9000511c1 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ListMetadataFiles.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ListMetadataFiles.java
@@ -66,6 +66,7 @@ public class ListMetadataFiles extends
ProcessFunction<Trigger, String> {
public void processElement(Trigger trigger, Context ctx, Collector<String>
collector)
throws Exception {
try {
+ table.refresh();
table
.snapshots()
.forEach(
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java
index bb8c74f3d5..8614c634f1 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java
@@ -23,8 +23,10 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.Test;
class TestListMetadataFiles extends OperatorTestBase {
@@ -87,4 +89,37 @@ class TestListMetadataFiles extends OperatorTestBase {
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
}
}
+
+ @Test
+ void testMetadataFilesIncludesSnapshotsAddedAfterOpen() throws Exception {
+ Table table = createTable();
+ insert(table, 1, "a");
+
+ try (OneInputStreamOperatorTestHarness<Trigger, String> testHarness =
+ ProcessFunctionTestHarnesses.forProcessFunction(
+ new ListMetadataFiles(OperatorTestBase.DUMMY_TABLE_NAME, 0,
tableLoader()))) {
+ testHarness.open();
+
+ // Add more snapshots AFTER the operator has been opened
+ insert(table, 2, "b");
+ insert(table, 3, "c");
+
+ OperatorTestBase.trigger(testHarness);
+
+ List<String> tableMetadataFiles = testHarness.extractOutputValues();
+
+ // Verify that manifest lists from ALL 3 snapshots are present, not just
the first one.
+ // Without table.refresh() in processElement, only snapshot 1's files
would be emitted.
+ table.refresh();
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+ assertThat(snapshots).hasSize(3);
+ for (Snapshot snapshot : snapshots) {
+
assertThat(tableMetadataFiles).contains(snapshot.manifestListLocation());
+ }
+ // Verify total count matches what 3 snapshots should produce
+ assertThat(tableMetadataFiles).hasSize(24);
+
+
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
+ }
+ }
}