This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new fafa5c5443 Flink: Fixed an issue where Flink batch entry was not
accurate (#5642)
fafa5c5443 is described below
commit fafa5c5443276bc06ab53b00117d86f9903c0274
Author: xzw_deepnova <[email protected]>
AuthorDate: Wed Sep 7 12:43:08 2022 +0800
Flink: Fixed an issue where Flink batch entry was not accurate (#5642)
---
.../iceberg/flink/sink/IcebergStreamWriter.java | 11 +++++++++
.../flink/sink/TestIcebergStreamWriter.java | 28 ++++++++++++++++++++--
2 files changed, 37 insertions(+), 2 deletions(-)
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
index ba499876e2..9ea0349fb0 100644
---
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
@@ -86,6 +86,9 @@ class IcebergStreamWriter<T> extends
AbstractStreamOperator<WriteResult>
// For bounded stream, it may don't enable the checkpoint mechanism so
we'd better to emit the
// remaining completed files to downstream before closing the writer so
that we won't miss any
// of them.
+ // Note that if the task is not closed after calling endInput, checkpoint
may be triggered again
+ // causing files to be sent repeatedly, the writer is marked as null after
the last file is sent
+ // to guard against duplicated writes.
flush();
}
@@ -100,10 +103,18 @@ class IcebergStreamWriter<T> extends
AbstractStreamOperator<WriteResult>
/** close all open files and emit files to downstream committer operator */
private void flush() throws IOException {
+ if (writer == null) {
+ return;
+ }
+
long startNano = System.nanoTime();
WriteResult result = writer.complete();
writerMetrics.updateFlushResult(result);
output.collect(new StreamRecord<>(result));
writerMetrics.flushDuration(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startNano));
+
+ // Set writer to null to prevent duplicate flushes in the corner case of
+ // prepareSnapshotPreBarrier happening after endInput.
+ writer = null;
}
}
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
index 2e134c61fd..c67d47c8c3 100644
---
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
@@ -225,12 +225,36 @@ public class TestIcebergStreamWriter {
Assert.assertEquals(0, result.deleteFiles().length);
Assert.assertEquals(expectedDataFiles, result.dataFiles().length);
- // invoke endInput again.
((BoundedOneInput) testHarness.getOneInputOperator()).endInput();
result =
WriteResult.builder().addAll(testHarness.extractOutputValues()).build();
Assert.assertEquals(0, result.deleteFiles().length);
- Assert.assertEquals(expectedDataFiles * 2, result.dataFiles().length);
+ // Datafiles should not be sent again
+ Assert.assertEquals(expectedDataFiles, result.dataFiles().length);
+ }
+ }
+
+ @Test
+ public void testBoundedStreamTriggeredEndInputBeforeTriggeringCheckpoint()
throws Exception {
+ try (OneInputStreamOperatorTestHarness<RowData, WriteResult> testHarness =
+ createIcebergStreamWriter()) {
+ testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1);
+ testHarness.processElement(SimpleDataUtil.createRowData(2, "world"), 2);
+
+ testHarness.endInput();
+
+ long expectedDataFiles = partitioned ? 2 : 1;
+ WriteResult result =
WriteResult.builder().addAll(testHarness.extractOutputValues()).build();
+ Assert.assertEquals(0, result.deleteFiles().length);
+ Assert.assertEquals(expectedDataFiles, result.dataFiles().length);
+
+ testHarness.prepareSnapshotPreBarrier(1L);
+
+ result =
WriteResult.builder().addAll(testHarness.extractOutputValues()).build();
+ Assert.assertEquals(0, result.deleteFiles().length);
+ // It should be ensured that after endInput is triggered, when
prepareSnapshotPreBarrier
+ // is triggered, write should only send WriteResult once
+ Assert.assertEquals(expectedDataFiles, result.dataFiles().length);
}
}