This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new fafa5c5443 Flink: Fixed an issue where Flink batch entry was not 
accurate (#5642)
fafa5c5443 is described below

commit fafa5c5443276bc06ab53b00117d86f9903c0274
Author: xzw_deepnova <[email protected]>
AuthorDate: Wed Sep 7 12:43:08 2022 +0800

    Flink: Fixed an issue where Flink batch entry was not accurate (#5642)
---
 .../iceberg/flink/sink/IcebergStreamWriter.java    | 11 +++++++++
 .../flink/sink/TestIcebergStreamWriter.java        | 28 ++++++++++++++++++++--
 2 files changed, 37 insertions(+), 2 deletions(-)

diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
index ba499876e2..9ea0349fb0 100644
--- 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
@@ -86,6 +86,9 @@ class IcebergStreamWriter<T> extends 
AbstractStreamOperator<WriteResult>
     // For bounded stream, it may don't enable the checkpoint mechanism so 
we'd better to emit the
     // remaining completed files to downstream before closing the writer so 
that we won't miss any
     // of them.
+    // Note that if the task is not closed after calling endInput, checkpoint 
may be triggered again
+    // causing files to be sent repeatedly, the writer is marked as null after 
the last file is sent
+    // to guard against duplicated writes.
     flush();
   }
 
@@ -100,10 +103,18 @@ class IcebergStreamWriter<T> extends 
AbstractStreamOperator<WriteResult>
 
   /** close all open files and emit files to downstream committer operator */
   private void flush() throws IOException {
+    if (writer == null) {
+      return;
+    }
+
     long startNano = System.nanoTime();
     WriteResult result = writer.complete();
     writerMetrics.updateFlushResult(result);
     output.collect(new StreamRecord<>(result));
     
writerMetrics.flushDuration(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNano));
+
+    // Set writer to null to prevent duplicate flushes in the corner case of
+    // prepareSnapshotPreBarrier happening after endInput.
+    writer = null;
   }
 }
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
index 2e134c61fd..c67d47c8c3 100644
--- 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
@@ -225,12 +225,36 @@ public class TestIcebergStreamWriter {
       Assert.assertEquals(0, result.deleteFiles().length);
       Assert.assertEquals(expectedDataFiles, result.dataFiles().length);
 
-      // invoke endInput again.
       ((BoundedOneInput) testHarness.getOneInputOperator()).endInput();
 
       result = 
WriteResult.builder().addAll(testHarness.extractOutputValues()).build();
       Assert.assertEquals(0, result.deleteFiles().length);
-      Assert.assertEquals(expectedDataFiles * 2, result.dataFiles().length);
+      // Datafiles should not be sent again
+      Assert.assertEquals(expectedDataFiles, result.dataFiles().length);
+    }
+  }
+
+  @Test
+  public void testBoundedStreamTriggeredEndInputBeforeTriggeringCheckpoint() 
throws Exception {
+    try (OneInputStreamOperatorTestHarness<RowData, WriteResult> testHarness =
+        createIcebergStreamWriter()) {
+      testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1);
+      testHarness.processElement(SimpleDataUtil.createRowData(2, "world"), 2);
+
+      testHarness.endInput();
+
+      long expectedDataFiles = partitioned ? 2 : 1;
+      WriteResult result = 
WriteResult.builder().addAll(testHarness.extractOutputValues()).build();
+      Assert.assertEquals(0, result.deleteFiles().length);
+      Assert.assertEquals(expectedDataFiles, result.dataFiles().length);
+
+      testHarness.prepareSnapshotPreBarrier(1L);
+
+      result = 
WriteResult.builder().addAll(testHarness.extractOutputValues()).build();
+      Assert.assertEquals(0, result.deleteFiles().length);
+      // It should be ensured that after endInput is triggered, when 
prepareSnapshotPreBarrier
+      // is triggered, write should only send WriteResult once
+      Assert.assertEquals(expectedDataFiles, result.dataFiles().length);
     }
   }
 

Reply via email to