This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 155f17d [FLINK-26564][connectors/filesystem] Fix the bug that CompactCoordinatorStateHandler doesn't properly handle the cleanup-in-progress requests. 155f17d is described below commit 155f17d62952fff5f5b583256f959f5fb63d1a8e Author: Gen Luo <luogen...@gmail.com> AuthorDate: Thu Mar 10 11:37:58 2022 +0800 [FLINK-26564][connectors/filesystem] Fix the bug that CompactCoordinatorStateHandler doesn't properly handle the cleanup-in-progress requests. This closes #19032. --- .../compactor/operator/CompactCoordinator.java | 2 +- .../operator/CompactCoordinatorStateHandler.java | 15 +++++++++++++- .../sink/compactor/operator/CompactorRequest.java | 3 +++ .../sink/compactor/CompactCoordinatorTest.java | 23 ++++++++++++++-------- 4 files changed, 33 insertions(+), 10 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java index 23437a5..8509423 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java @@ -209,7 +209,7 @@ public class CompactCoordinator extends AbstractStreamOperator<CompactorRequest> PASS_THROUGH } - private static class CompactTrigger { + static class CompactTrigger { private final long threshold; private final int numCheckpointsBeforeCompaction; diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandler.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandler.java index 2b147ae..1e4d0f9 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandler.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandler.java @@ -21,6 +21,9 @@ package org.apache.flink.connector.file.sink.compactor.operator; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.ListState; import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy; +import org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator.CompactTrigger; +import org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator.CompactTriggerResult; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; @@ -70,6 +73,11 @@ public class CompactCoordinatorStateHandler .getListState(REMAINING_COMMITTABLE_RAW_STATES_DESC), committableSerializer); + // A default trigger to judge whether a pending file should be compacted or passed through + CompactTrigger trigger = + new CompactTrigger( + FileCompactStrategy.Builder.newBuilder().setSizeThreshold(0).build()); + Iterable<FileSinkCommittable> stateRemaining = remainingCommittableState.get(); if (stateRemaining != null) { for (FileSinkCommittable committable : stateRemaining) { @@ -77,7 +85,12 @@ public class CompactCoordinatorStateHandler // compacting is not available now String bucketId = committable.getBucketId(); CompactorRequest request = new CompactorRequest(bucketId); - request.addToCompact(committable); + if (committable.hasPendingFile() + && trigger.onElement(committable) != CompactTriggerResult.PASS_THROUGH) { + request.addToCompact(committable); + } else { + request.addToPassthrough(committable); + } output.collect(new StreamRecord<>(Either.Right(request))); } } diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequest.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequest.java index 86bc78f..483468a 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequest.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequest.java @@ -26,6 +26,8 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import static org.apache.flink.util.Preconditions.checkState; + /** Request of file compacting for {@link FileSink}. */ @Internal public class CompactorRequest implements Serializable { @@ -49,6 +51,7 @@ public class CompactorRequest implements Serializable { } public void addToCompact(FileSinkCommittable committable) { + checkState(committable.hasPendingFile()); committableToCompact.add(committable); } diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactCoordinatorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactCoordinatorTest.java index 0fbfb9b..9a07683 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactCoordinatorTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactCoordinatorTest.java @@ -341,6 +341,8 @@ public class CompactCoordinatorTest extends AbstractCompactTestBase { // without . prefix FileSinkCommittable committable2 = committable("0", "2", 6); + FileSinkCommittable cleanup3 = cleanupInprogress("0", "3", 7); + OperatorSubtaskState state; try (OneInputStreamOperatorTestHarness< CommittableMessage<FileSinkCommittable>, CompactorRequest> @@ -351,6 +353,9 @@ public class CompactCoordinatorTest extends AbstractCompactTestBase { harness.processElement(message(committable0)); Assert.assertEquals(0, harness.extractOutputValues().size()); + harness.processElement(message(cleanup3)); + Assert.assertEquals(0, harness.extractOutputValues().size()); + harness.prepareSnapshotPreBarrier(1); state = harness.snapshot(1, 1); } @@ -374,34 +379,36 @@ public class CompactCoordinatorTest extends AbstractCompactTestBase { harness.initializeState(state); harness.open(); - Assert.assertEquals(1, harness.extractOutputValues().size()); + Assert.assertEquals(2, harness.extractOutputValues().size()); harness.processElement(message(committable1)); harness.processElement(message(committable2)); List<Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>> results = harness.extractOutputValues(); - Assert.assertEquals(3, results.size()); + Assert.assertEquals(4, results.size()); // restored request Assert.assertTrue(results.get(0).isRight()); assertToCompact(results.get(0).right(), committable0); + assertToPassthrough(results.get(1).right(), cleanup3); + // committable with . prefix should also be passed through Assert.assertTrue( - results.get(1).isLeft() - && results.get(1).left() instanceof CommittableWithLineage); + results.get(2).isLeft() + && results.get(2).left() instanceof CommittableWithLineage); Assert.assertEquals( - ((CommittableWithLineage<FileSinkCommittable>) results.get(1).left()) + ((CommittableWithLineage<FileSinkCommittable>) results.get(2).left()) .getCommittable(), committable1); // committable without . prefix should be passed through normally Assert.assertTrue( - results.get(2).isLeft() - && results.get(2).left() instanceof CommittableWithLineage); + results.get(3).isLeft() + && results.get(3).left() instanceof CommittableWithLineage); Assert.assertEquals( - ((CommittableWithLineage<FileSinkCommittable>) results.get(2).left()) + ((CommittableWithLineage<FileSinkCommittable>) results.get(3).left()) .getCommittable(), committable2); }