This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 75b759d  [FLINK-20213][fs-connector] Partition commit is delayed when 
records keep coming
75b759d is described below

commit 75b759d6a3a7e193521ab7f0c738b9faf908601c
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Nov 25 20:36:47 2020 +0800

    [FLINK-20213][fs-connector] Partition commit is delayed when records keep 
coming
    
    This closes #14116
---
 .../filesystem/stream/StreamingFileWriter.java     | 25 +++++++++++++++++-----
 .../filesystem/stream/StreamingFileWriterTest.java | 25 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 5 deletions(-)

diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
index 924ef98..d9609f6 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
@@ -36,7 +36,9 @@ import 
org.apache.flink.table.filesystem.stream.StreamingFileCommitter.CommitMes
 
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.NavigableMap;
 import java.util.Set;
+import java.util.TreeMap;
 
 /**
  * Operator for file system sink. It is a operator version of {@link 
StreamingFileSink}.
@@ -64,7 +66,9 @@ public class StreamingFileWriter extends 
AbstractStreamOperator<CommitMessage>
 
        private transient long currentWatermark;
 
-       private transient Set<String> inactivePartitions;
+       private transient Set<String> currentNewPartitions;
+       private transient TreeMap<Long, Set<String>> newPartitions;
+       private transient Set<String> committablePartitions;
 
        public StreamingFileWriter(
                        long bucketCheckInterval,
@@ -81,15 +85,18 @@ public class StreamingFileWriter extends 
AbstractStreamOperator<CommitMessage>
                buckets = 
bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask());
 
                // Set listener before the initialization of Buckets.
-               inactivePartitions = new HashSet<>();
+               currentNewPartitions = new HashSet<>();
+               newPartitions = new TreeMap<>();
+               committablePartitions = new HashSet<>();
                buckets.setBucketLifeCycleListener(new 
BucketLifeCycleListener<RowData, String>() {
                        @Override
                        public void bucketCreated(Bucket<RowData, String> 
bucket) {
+                               currentNewPartitions.add(bucket.getBucketId());
                        }
 
                        @Override
                        public void bucketInactive(Bucket<RowData, String> 
bucket) {
-                               inactivePartitions.add(bucket.getBucketId());
+                               committablePartitions.add(bucket.getBucketId());
                        }
                });
 
@@ -106,6 +113,8 @@ public class StreamingFileWriter extends 
AbstractStreamOperator<CommitMessage>
        public void snapshotState(StateSnapshotContext context) throws 
Exception {
                super.snapshotState(context);
                helper.snapshotState(context.getCheckpointId());
+               newPartitions.put(context.getCheckpointId(), new 
HashSet<>(currentNewPartitions));
+               currentNewPartitions.clear();
        }
 
        @Override
@@ -134,13 +143,19 @@ public class StreamingFileWriter extends 
AbstractStreamOperator<CommitMessage>
 
        private void commitUpToCheckpoint(long checkpointId) throws Exception {
                helper.commitUpToCheckpoint(checkpointId);
+
+               NavigableMap<Long, Set<String>> headPartitions = 
this.newPartitions.headMap(checkpointId, true);
+               Set<String> partitions = new HashSet<>(committablePartitions);
+               committablePartitions.clear();
+               headPartitions.values().forEach(partitions::addAll);
+               headPartitions.clear();
+
                CommitMessage message = new CommitMessage(
                                checkpointId,
                                getRuntimeContext().getIndexOfThisSubtask(),
                                
getRuntimeContext().getNumberOfParallelSubtasks(),
-                               new ArrayList<>(inactivePartitions));
+                               new ArrayList<>(partitions));
                output.collect(new StreamRecord<>(message));
-               inactivePartitions.clear();
        }
 
        @Override
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
index 6ea42af..9bdafb6 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
@@ -126,6 +126,31 @@ public class StreamingFileWriterTest {
                }
        }
 
+       @Test
+       public void testCommitImmediately() throws Exception {
+               try (OneInputStreamOperatorTestHarness<RowData, CommitMessage> 
harness = create()) {
+                       harness.setup();
+                       harness.initializeEmptyState();
+                       harness.open();
+
+                       harness.processElement(row("1"), 0);
+                       harness.processElement(row("2"), 0);
+                       harness.processElement(row("2"), 0);
+
+                       harness.snapshot(1, 1);
+
+                       // repeat partition 1
+                       harness.processElement(row("1"), 0);
+
+                       harness.processElement(row("3"), 0);
+                       harness.processElement(row("4"), 0);
+
+                       harness.notifyOfCompletedCheckpoint(1);
+                       List<String> partitions = collect(harness);
+                       Assert.assertEquals(Arrays.asList("1", "2"), 
partitions);
+               }
+       }
+
        private static RowData row(String s) {
                return GenericRowData.of(StringData.fromString(s));
        }

Reply via email to