This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 155f17d  [FLINK-26564][connectors/filesystem] Fix the bug that 
CompactCoordinatorStateHandler doesn't properly handle the cleanup-in-progress 
requests.
155f17d is described below

commit 155f17d62952fff5f5b583256f959f5fb63d1a8e
Author: Gen Luo <luogen...@gmail.com>
AuthorDate: Thu Mar 10 11:37:58 2022 +0800

    [FLINK-26564][connectors/filesystem] Fix the bug that 
CompactCoordinatorStateHandler doesn't properly handle the cleanup-in-progress 
requests.
    
    This closes #19032.
---
 .../compactor/operator/CompactCoordinator.java     |  2 +-
 .../operator/CompactCoordinatorStateHandler.java   | 15 +++++++++++++-
 .../sink/compactor/operator/CompactorRequest.java  |  3 +++
 .../sink/compactor/CompactCoordinatorTest.java     | 23 ++++++++++++++--------
 4 files changed, 33 insertions(+), 10 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java
index 23437a5..8509423 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java
@@ -209,7 +209,7 @@ public class CompactCoordinator extends 
AbstractStreamOperator<CompactorRequest>
         PASS_THROUGH
     }
 
-    private static class CompactTrigger {
+    static class CompactTrigger {
         private final long threshold;
         private final int numCheckpointsBeforeCompaction;
 
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandler.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandler.java
index 2b147ae..1e4d0f9 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandler.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandler.java
@@ -21,6 +21,9 @@ package 
org.apache.flink.connector.file.sink.compactor.operator;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
+import 
org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator.CompactTrigger;
+import 
org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator.CompactTriggerResult;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
@@ -70,6 +73,11 @@ public class CompactCoordinatorStateHandler
                                 
.getListState(REMAINING_COMMITTABLE_RAW_STATES_DESC),
                         committableSerializer);
 
+        // A default trigger to judge whether a pending file should be 
compacted or passed through
+        CompactTrigger trigger =
+                new CompactTrigger(
+                        
FileCompactStrategy.Builder.newBuilder().setSizeThreshold(0).build());
+
         Iterable<FileSinkCommittable> stateRemaining = 
remainingCommittableState.get();
         if (stateRemaining != null) {
             for (FileSinkCommittable committable : stateRemaining) {
@@ -77,7 +85,12 @@ public class CompactCoordinatorStateHandler
                 // compacting is not available now
                 String bucketId = committable.getBucketId();
                 CompactorRequest request = new CompactorRequest(bucketId);
-                request.addToCompact(committable);
+                if (committable.hasPendingFile()
+                        && trigger.onElement(committable) != 
CompactTriggerResult.PASS_THROUGH) {
+                    request.addToCompact(committable);
+                } else {
+                    request.addToPassthrough(committable);
+                }
                 output.collect(new StreamRecord<>(Either.Right(request)));
             }
         }
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequest.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequest.java
index 86bc78f..483468a 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequest.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequest.java
@@ -26,6 +26,8 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /** Request of file compacting for {@link FileSink}. */
 @Internal
 public class CompactorRequest implements Serializable {
@@ -49,6 +51,7 @@ public class CompactorRequest implements Serializable {
     }
 
     public void addToCompact(FileSinkCommittable committable) {
+        checkState(committable.hasPendingFile());
         committableToCompact.add(committable);
     }
 
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactCoordinatorTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactCoordinatorTest.java
index 0fbfb9b..9a07683 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactCoordinatorTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactCoordinatorTest.java
@@ -341,6 +341,8 @@ public class CompactCoordinatorTest extends 
AbstractCompactTestBase {
         // without . prefix
         FileSinkCommittable committable2 = committable("0", "2", 6);
 
+        FileSinkCommittable cleanup3 = cleanupInprogress("0", "3", 7);
+
         OperatorSubtaskState state;
         try (OneInputStreamOperatorTestHarness<
                         CommittableMessage<FileSinkCommittable>, 
CompactorRequest>
@@ -351,6 +353,9 @@ public class CompactCoordinatorTest extends 
AbstractCompactTestBase {
             harness.processElement(message(committable0));
             Assert.assertEquals(0, harness.extractOutputValues().size());
 
+            harness.processElement(message(cleanup3));
+            Assert.assertEquals(0, harness.extractOutputValues().size());
+
             harness.prepareSnapshotPreBarrier(1);
             state = harness.snapshot(1, 1);
         }
@@ -374,34 +379,36 @@ public class CompactCoordinatorTest extends 
AbstractCompactTestBase {
             harness.initializeState(state);
             harness.open();
 
-            Assert.assertEquals(1, harness.extractOutputValues().size());
+            Assert.assertEquals(2, harness.extractOutputValues().size());
 
             harness.processElement(message(committable1));
             harness.processElement(message(committable2));
 
             List<Either<CommittableMessage<FileSinkCommittable>, 
CompactorRequest>> results =
                     harness.extractOutputValues();
-            Assert.assertEquals(3, results.size());
+            Assert.assertEquals(4, results.size());
 
             // restored request
             Assert.assertTrue(results.get(0).isRight());
             assertToCompact(results.get(0).right(), committable0);
 
+            assertToPassthrough(results.get(1).right(), cleanup3);
+
             // committable with . prefix should also be passed through
             Assert.assertTrue(
-                    results.get(1).isLeft()
-                            && results.get(1).left() instanceof 
CommittableWithLineage);
+                    results.get(2).isLeft()
+                            && results.get(2).left() instanceof 
CommittableWithLineage);
             Assert.assertEquals(
-                    ((CommittableWithLineage<FileSinkCommittable>) 
results.get(1).left())
+                    ((CommittableWithLineage<FileSinkCommittable>) 
results.get(2).left())
                             .getCommittable(),
                     committable1);
 
             // committable without . prefix should be passed through normally
             Assert.assertTrue(
-                    results.get(2).isLeft()
-                            && results.get(2).left() instanceof 
CommittableWithLineage);
+                    results.get(3).isLeft()
+                            && results.get(3).left() instanceof 
CommittableWithLineage);
             Assert.assertEquals(
-                    ((CommittableWithLineage<FileSinkCommittable>) 
results.get(2).left())
+                    ((CommittableWithLineage<FileSinkCommittable>) 
results.get(3).left())
                             .getCommittable(),
                     committable2);
         }

Reply via email to