This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 22f866687a Flink: Backport PR #16324 to v2.0 and v1.20 (#16338)
22f866687a is described below

commit 22f866687a7cae6b1475781998583f7da089b9f4
Author: Kevin Liu <[email protected]>
AuthorDate: Thu May 14 15:10:55 2026 -0400

    Flink: Backport PR #16324 to v2.0 and v1.20 (#16338)
    
    Agent-Logs-Url: 
https://github.com/kevinjqliu/iceberg/sessions/682ce8b4-890f-41a9-a89a-b1f2873be44c
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: kevinjqliu <[email protected]>
---
 .../maintenance/operator/ListMetadataFiles.java    |  1 +
 .../operator/TestListMetadataFiles.java            | 35 ++++++++++++++++++++++
 .../maintenance/operator/ListMetadataFiles.java    |  1 +
 .../operator/TestListMetadataFiles.java            | 35 ++++++++++++++++++++++
 4 files changed, 72 insertions(+)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ListMetadataFiles.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ListMetadataFiles.java
index 3ae42c6083..f9000511c1 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ListMetadataFiles.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ListMetadataFiles.java
@@ -66,6 +66,7 @@ public class ListMetadataFiles extends 
ProcessFunction<Trigger, String> {
   public void processElement(Trigger trigger, Context ctx, Collector<String> 
collector)
       throws Exception {
     try {
+      table.refresh();
       table
           .snapshots()
           .forEach(
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java
index bb8c74f3d5..8614c634f1 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java
@@ -23,8 +23,10 @@ import static org.assertj.core.api.Assertions.assertThat;
 import java.util.List;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.jupiter.api.Test;
 
 class TestListMetadataFiles extends OperatorTestBase {
@@ -87,4 +89,37 @@ class TestListMetadataFiles extends OperatorTestBase {
       
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
     }
   }
+
+  @Test
+  void testMetadataFilesIncludesSnapshotsAddedAfterOpen() throws Exception {
+    Table table = createTable();
+    insert(table, 1, "a");
+
+    try (OneInputStreamOperatorTestHarness<Trigger, String> testHarness =
+        ProcessFunctionTestHarnesses.forProcessFunction(
+            new ListMetadataFiles(OperatorTestBase.DUMMY_TABLE_NAME, 0, 
tableLoader()))) {
+      testHarness.open();
+
+      // Add more snapshots AFTER the operator has been opened
+      insert(table, 2, "b");
+      insert(table, 3, "c");
+
+      OperatorTestBase.trigger(testHarness);
+
+      List<String> tableMetadataFiles = testHarness.extractOutputValues();
+
+      // Verify that manifest lists from ALL 3 snapshots are present, not just 
the first one.
+      // Without table.refresh() in processElement, only snapshot 1's files 
would be emitted.
+      table.refresh();
+      List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+      assertThat(snapshots).hasSize(3);
+      for (Snapshot snapshot : snapshots) {
+        
assertThat(tableMetadataFiles).contains(snapshot.manifestListLocation());
+      }
+      // Verify total count matches what 3 snapshots should produce
+      assertThat(tableMetadataFiles).hasSize(24);
+
+      
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
+    }
+  }
 }
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ListMetadataFiles.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ListMetadataFiles.java
index 3ae42c6083..f9000511c1 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ListMetadataFiles.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ListMetadataFiles.java
@@ -66,6 +66,7 @@ public class ListMetadataFiles extends 
ProcessFunction<Trigger, String> {
   public void processElement(Trigger trigger, Context ctx, Collector<String> 
collector)
       throws Exception {
     try {
+      table.refresh();
       table
           .snapshots()
           .forEach(
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java
index bb8c74f3d5..8614c634f1 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java
@@ -23,8 +23,10 @@ import static org.assertj.core.api.Assertions.assertThat;
 import java.util.List;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.jupiter.api.Test;
 
 class TestListMetadataFiles extends OperatorTestBase {
@@ -87,4 +89,37 @@ class TestListMetadataFiles extends OperatorTestBase {
       
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
     }
   }
+
+  @Test
+  void testMetadataFilesIncludesSnapshotsAddedAfterOpen() throws Exception {
+    Table table = createTable();
+    insert(table, 1, "a");
+
+    try (OneInputStreamOperatorTestHarness<Trigger, String> testHarness =
+        ProcessFunctionTestHarnesses.forProcessFunction(
+            new ListMetadataFiles(OperatorTestBase.DUMMY_TABLE_NAME, 0, 
tableLoader()))) {
+      testHarness.open();
+
+      // Add more snapshots AFTER the operator has been opened
+      insert(table, 2, "b");
+      insert(table, 3, "c");
+
+      OperatorTestBase.trigger(testHarness);
+
+      List<String> tableMetadataFiles = testHarness.extractOutputValues();
+
+      // Verify that manifest lists from ALL 3 snapshots are present, not just 
the first one.
+      // Without table.refresh() in processElement, only snapshot 1's files 
would be emitted.
+      table.refresh();
+      List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+      assertThat(snapshots).hasSize(3);
+      for (Snapshot snapshot : snapshots) {
+        
assertThat(tableMetadataFiles).contains(snapshot.manifestListLocation());
+      }
+      // Verify total count matches what 3 snapshots should produce
+      assertThat(tableMetadataFiles).hasSize(24);
+
+      
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
+    }
+  }
 }

Reply via email to