This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 75b759d [FLINK-20213][fs-connector] Partition commit is delayed when
records keep coming
75b759d is described below
commit 75b759d6a3a7e193521ab7f0c738b9faf908601c
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Nov 25 20:36:47 2020 +0800
[FLINK-20213][fs-connector] Partition commit is delayed when records keep
coming
This closes #14116
---
.../filesystem/stream/StreamingFileWriter.java | 25 +++++++++++++++++-----
.../filesystem/stream/StreamingFileWriterTest.java | 25 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 5 deletions(-)
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
index 924ef98..d9609f6 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
@@ -36,7 +36,9 @@ import
org.apache.flink.table.filesystem.stream.StreamingFileCommitter.CommitMes
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.NavigableMap;
import java.util.Set;
+import java.util.TreeMap;
/**
* Operator for file system sink. It is a operator version of {@link
StreamingFileSink}.
@@ -64,7 +66,9 @@ public class StreamingFileWriter extends
AbstractStreamOperator<CommitMessage>
private transient long currentWatermark;
- private transient Set<String> inactivePartitions;
+ private transient Set<String> currentNewPartitions;
+ private transient TreeMap<Long, Set<String>> newPartitions;
+ private transient Set<String> committablePartitions;
public StreamingFileWriter(
long bucketCheckInterval,
@@ -81,15 +85,18 @@ public class StreamingFileWriter extends
AbstractStreamOperator<CommitMessage>
buckets =
bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask());
// Set listener before the initialization of Buckets.
- inactivePartitions = new HashSet<>();
+ currentNewPartitions = new HashSet<>();
+ newPartitions = new TreeMap<>();
+ committablePartitions = new HashSet<>();
buckets.setBucketLifeCycleListener(new
BucketLifeCycleListener<RowData, String>() {
@Override
public void bucketCreated(Bucket<RowData, String>
bucket) {
+ currentNewPartitions.add(bucket.getBucketId());
}
@Override
public void bucketInactive(Bucket<RowData, String>
bucket) {
- inactivePartitions.add(bucket.getBucketId());
+ committablePartitions.add(bucket.getBucketId());
}
});
@@ -106,6 +113,8 @@ public class StreamingFileWriter extends
AbstractStreamOperator<CommitMessage>
public void snapshotState(StateSnapshotContext context) throws
Exception {
super.snapshotState(context);
helper.snapshotState(context.getCheckpointId());
+ newPartitions.put(context.getCheckpointId(), new
HashSet<>(currentNewPartitions));
+ currentNewPartitions.clear();
}
@Override
@@ -134,13 +143,19 @@ public class StreamingFileWriter extends
AbstractStreamOperator<CommitMessage>
private void commitUpToCheckpoint(long checkpointId) throws Exception {
helper.commitUpToCheckpoint(checkpointId);
+
+ NavigableMap<Long, Set<String>> headPartitions =
this.newPartitions.headMap(checkpointId, true);
+ Set<String> partitions = new HashSet<>(committablePartitions);
+ committablePartitions.clear();
+ headPartitions.values().forEach(partitions::addAll);
+ headPartitions.clear();
+
CommitMessage message = new CommitMessage(
checkpointId,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),
- new ArrayList<>(inactivePartitions));
+ new ArrayList<>(partitions));
output.collect(new StreamRecord<>(message));
- inactivePartitions.clear();
}
@Override
diff --git
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
index 6ea42af..9bdafb6 100644
---
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
+++
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
@@ -126,6 +126,31 @@ public class StreamingFileWriterTest {
}
}
+ @Test
+ public void testCommitImmediately() throws Exception {
+ try (OneInputStreamOperatorTestHarness<RowData, CommitMessage>
harness = create()) {
+ harness.setup();
+ harness.initializeEmptyState();
+ harness.open();
+
+ harness.processElement(row("1"), 0);
+ harness.processElement(row("2"), 0);
+ harness.processElement(row("2"), 0);
+
+ harness.snapshot(1, 1);
+
+ // repeat partition 1
+ harness.processElement(row("1"), 0);
+
+ harness.processElement(row("3"), 0);
+ harness.processElement(row("4"), 0);
+
+ harness.notifyOfCompletedCheckpoint(1);
+ List<String> partitions = collect(harness);
+ Assert.assertEquals(Arrays.asList("1", "2"),
partitions);
+ }
+ }
+
private static RowData row(String s) {
return GenericRowData.of(StringData.fromString(s));
}