This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 7a509c46e45 [FLINK-29509][streaming] Set subtaskId and numberOfSubtasks on CheckpointCommittableManagerImpl during deserialization. 7a509c46e45 is described below commit 7a509c46e45b9a91f2b7d01f13afcdef266b1faf Author: Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com> AuthorDate: Mon Oct 10 16:34:36 2022 +0200 [FLINK-29509][streaming] Set subtaskId and numberOfSubtasks on CheckpointCommittableManagerImpl during deserialization. When we recover the `CheckpointCommittableManager` we were ignoring the subtaskId it is recovered on. This becomes a problem when a sink uses a post-commit topology because multiple committer operators might forward committable summaries coming from the same subtaskId. This ticket implements a fix to use the subtaskId already present in the CommittableCollectorSerializer when recreating CheckpointCommittableManagerImpl during recovery. Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com> --- .../CheckpointCommittableManagerImpl.java | 13 +-- .../CommittableCollectorSerializer.java | 2 + .../connector/sink2/CommittableSummaryAssert.java | 13 +++ .../operators/sink/CommitterOperatorTest.java | 56 ++++++++++- .../CheckpointCommittableManagerImplTest.java | 23 +++++ .../CommittableCollectorSerializerTest.java | 110 +++++++++++++++------ 6 files changed, 177 insertions(+), 40 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java index 4fd22f24ec0..b2a703c8db7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java @@ -45,18 +45,17 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa CheckpointCommittableManagerImpl( int subtaskId, int numberOfSubtasks, @Nullable Long checkpointId) { - this.subtaskId = subtaskId; - this.numberOfSubtasks = numberOfSubtasks; - this.checkpointId = checkpointId; - this.subtasksCommittableManagers = new HashMap<>(); + this(new HashMap<>(), subtaskId, numberOfSubtasks, checkpointId); } CheckpointCommittableManagerImpl( Map<Integer, SubtaskCommittableManager<CommT>> subtasksCommittableManagers, + int subtaskId, + int numberOfSubtasks, @Nullable Long checkpointId) { this.subtasksCommittableManagers = checkNotNull(subtasksCommittableManagers); - this.subtaskId = 0; - this.numberOfSubtasks = 1; + this.subtaskId = subtaskId; + this.numberOfSubtasks = numberOfSubtasks; this.checkpointId = checkpointId; } @@ -158,6 +157,8 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa return new CheckpointCommittableManagerImpl<>( subtasksCommittableManagers.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, (e) -> e.getValue().copy())), + subtaskId, + numberOfSubtasks, checkpointId); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java index 07947b98a06..6f8495b6637 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java @@ -152,6 +152,8 @@ public final class CommittableCollectorSerializer<CommT> .collect( Collectors.toMap( SubtaskCommittableManager::getSubtaskId, e -> e)), + subtaskId, + numberOfSubtasks, checkpointId); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java index bc84e3146ec..fb1062a5091 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java @@ -36,6 +36,7 @@ public class CommittableSummaryAssert isNotNull(); assertThat(actual.getSubtaskId()).isEqualTo(summary.getSubtaskId()); assertThat(actual.getCheckpointId()).isEqualTo(summary.getCheckpointId()); + assertThat(actual.getNumberOfSubtasks()).isEqualTo(summary.getNumberOfSubtasks()); assertThat(actual.getNumberOfCommittables()).isEqualTo(summary.getNumberOfCommittables()); assertThat(actual.getNumberOfPendingCommittables()) .isEqualTo(summary.getNumberOfPendingCommittables()); @@ -44,6 +45,18 @@ public class CommittableSummaryAssert return this; } + public CommittableSummaryAssert hasSubtaskId(int subtaskId) { + isNotNull(); + assertThat(actual.getSubtaskId()).isEqualTo(subtaskId); + return this; + } + + public CommittableSummaryAssert hasNumberOfSubtasks(int numberOfSubtasks) { + isNotNull(); + assertThat(actual.getNumberOfSubtasks()).isEqualTo(numberOfSubtasks); + return this; + } + public CommittableSummaryAssert hasOverallCommittables(int committableNumber) { isNotNull(); assertThat(actual.getNumberOfCommittables()).isEqualTo(committableNumber); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java index 204d664247c..d7b0b550206 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java @@ -219,15 +219,27 @@ class CommitterOperatorTest { @Test void testStateRestore() throws Exception { + + final int originalSubtaskId = 0; + final int subtaskIdAfterRecovery = 9; + final OneInputStreamOperatorTestHarness< CommittableMessage<String>, CommittableMessage<String>> - testHarness = createTestHarness(new TestSink.RetryOnceCommitter(), false, true); + testHarness = + createTestHarness( + new TestSink.RetryOnceCommitter(), + false, + true, + 1, + 1, + originalSubtaskId); testHarness.open(); final CommittableSummary<String> committableSummary = - new CommittableSummary<>(1, 1, 0L, 1, 1, 0); + new CommittableSummary<>(originalSubtaskId, 1, 0L, 1, 1, 0); testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableWithLineage<String> first = new CommittableWithLineage<>("1", 0L, 1); + final CommittableWithLineage<String> first = + new CommittableWithLineage<>("1", 0L, originalSubtaskId); testHarness.processElement(new StreamRecord<>(first)); final OperatorSubtaskState snapshot = testHarness.snapshot(0L, 2L); @@ -239,9 +251,14 @@ class CommitterOperatorTest { testHarness.close(); final ForwardingCommitter committer = new ForwardingCommitter(); + + // create new testHarness but with different parallelism level and subtaskId that original + // one. + // we will make sure that new subtaskId was used during committable recovery. final OneInputStreamOperatorTestHarness< CommittableMessage<String>, CommittableMessage<String>> - restored = createTestHarness(committer, false, true); + restored = + createTestHarness(committer, false, true, 10, 10, subtaskIdAfterRecovery); restored.initializeState(snapshot); restored.open(); @@ -256,7 +273,9 @@ class CommitterOperatorTest { .hasPendingCommittables(0); SinkV2Assertions.assertThat(toCommittableWithLinage(output.get(1))) - .isEqualTo(new CommittableWithLineage<>(first.getCommittable(), 1L, 0)); + .isEqualTo( + new CommittableWithLineage<>( + first.getCommittable(), 1L, subtaskIdAfterRecovery)); restored.close(); } @@ -344,6 +363,33 @@ class CommitterOperatorTest { isCheckpointingEnabled)); } + private OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + createTestHarness( + Committer<String> committer, + boolean isBatchMode, + boolean isCheckpointingEnabled, + int maxParallelism, + int parallelism, + int subtaskId) + throws Exception { + return new OneInputStreamOperatorTestHarness<>( + new CommitterOperatorFactory<>( + (TwoPhaseCommittingSink<?, String>) + TestSink.newBuilder() + .setCommitter(committer) + .setDefaultGlobalCommitter() + .setCommittableSerializer( + TestSink.StringCommittableSerializer.INSTANCE) + .build() + .asV2(), + isBatchMode, + isCheckpointingEnabled), + maxParallelism, + parallelism, + subtaskId); + } + private static class ForwardingCommitter extends TestSink.DefaultCommitter { private int successfulCommits = 0; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java index e47c21dd09c..a4c8c0f3c44 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java @@ -21,8 +21,11 @@ package org.apache.flink.streaming.runtime.operators.sink.committables; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import java.io.IOException; import java.util.Collection; @@ -93,6 +96,26 @@ class CheckpointCommittableManagerImplTest { .hasMessageContaining("FLINK-25920"); } + // check different values of subtaskId and numberOfSubtasks to make sure that no value is + // hardcoded. + @ParameterizedTest(name = "subtaskId = {0}, numberOfSubtasks = {1}, checkpointId = {2}") + @CsvSource({"1, 10, 100", "2, 20, 200", "3, 30, 300"}) + public void testCopy(int subtaskId, int numberOfSubtasks, long checkpointId) { + + final CheckpointCommittableManagerImpl<Integer> original = + new CheckpointCommittableManagerImpl<>(subtaskId, numberOfSubtasks, checkpointId); + original.upsertSummary( + new CommittableSummary<>(subtaskId, numberOfSubtasks, checkpointId, 1, 0, 0)); + + CheckpointCommittableManagerImpl<Integer> copy = original.copy(); + + assertThat(copy.getCheckpointId()).isEqualTo(checkpointId); + SinkV2Assertions.assertThat(copy.getSummary()) + .hasNumberOfSubtasks(numberOfSubtasks) + .hasSubtaskId(subtaskId) + .hasCheckpointId(checkpointId); + } + private static class NoOpCommitter implements Committer<Integer> { @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java index b12fed07b75..af9e4070d61 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java @@ -24,6 +24,7 @@ import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; import org.apache.flink.streaming.api.connector.sink2.IntegerSerializer; +import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions; import org.junit.jupiter.api.Test; @@ -35,6 +36,7 @@ import java.util.List; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertAll; class CommittableCollectorSerializerTest { @@ -72,38 +74,88 @@ class CommittableCollectorSerializerTest { @Test void testCommittableCollectorV2SerDe() throws IOException { - final CommittableCollector<Integer> committableCollector = new CommittableCollector<>(2, 3); - committableCollector.addMessage(new CommittableSummary<>(2, 3, 1L, 1, 1, 0)); - committableCollector.addMessage(new CommittableSummary<>(2, 3, 2L, 1, 1, 0)); - committableCollector.addMessage(new CommittableWithLineage<>(1, 1L, 2)); - committableCollector.addMessage(new CommittableWithLineage<>(2, 2L, 2)); + int subtaskId = 2; + int numberOfSubtasks = 3; + + final CommittableCollectorSerializer<Integer> ccSerializer = + new CommittableCollectorSerializer<>( + COMMITTABLE_SERIALIZER, subtaskId, numberOfSubtasks); + + final CommittableCollector<Integer> committableCollector = + new CommittableCollector<>(subtaskId, numberOfSubtasks); + committableCollector.addMessage( + new CommittableSummary<>(subtaskId, numberOfSubtasks, 1L, 1, 1, 0)); + committableCollector.addMessage( + new CommittableSummary<>(subtaskId, numberOfSubtasks, 2L, 1, 1, 0)); + committableCollector.addMessage(new CommittableWithLineage<>(1, 1L, subtaskId)); + committableCollector.addMessage(new CommittableWithLineage<>(2, 2L, subtaskId)); + final CommittableCollector<Integer> copy = - SERIALIZER.deserialize(2, SERIALIZER.serialize(committableCollector)); + ccSerializer.deserialize(2, SERIALIZER.serialize(committableCollector)); // Expect the subtask Id equal to the origin of the collector - assertThat(copy.getSubtaskId()).isEqualTo(1); + assertThat(copy.getSubtaskId()).isEqualTo(subtaskId); assertThat(copy.isFinished()).isFalse(); - assertThat(copy.getNumberOfSubtasks()).isEqualTo(1); - final Collection<CheckpointCommittableManagerImpl<Integer>> checkpointCommittables = - committableCollector.getCheckpointCommittables(); - assertThat(checkpointCommittables).hasSize(2); - final Iterator<CheckpointCommittableManagerImpl<Integer>> committablesIterator = - checkpointCommittables.iterator(); - final SubtaskCommittableManager<Integer> subtaskCommittableManagerCheckpoint1 = - committablesIterator.next().getSubtaskCommittableManager(2); - assertThat( - subtaskCommittableManagerCheckpoint1 - .getPendingRequests() - .map(CommitRequestImpl::getCommittable) - .collect(Collectors.toList())) - .containsExactly(1); - final SubtaskCommittableManager<Integer> subtaskCommittableManagerCheckpoint2 = - committablesIterator.next().getSubtaskCommittableManager(2); - assertThat( - subtaskCommittableManagerCheckpoint2 - .getPendingRequests() - .map(CommitRequestImpl::getCommittable) - .collect(Collectors.toList())) - .containsExactly(2); + assertThat(copy.getNumberOfSubtasks()).isEqualTo(numberOfSubtasks); + + // assert original CommittableCollector + assertCommittableCollector( + "Original CommittableCollector", subtaskId, numberOfSubtasks, committableCollector); + + // assert deserialized CommittableCollector + assertCommittableCollector( + "Deserialized CommittableCollector", subtaskId, numberOfSubtasks, copy); + } + + private void assertCommittableCollector( + String assertMessageHeading, + int subtaskId, + int numberOfSubtasks, + CommittableCollector<Integer> committableCollector) { + + assertAll( + assertMessageHeading, + () -> { + final Collection<CheckpointCommittableManagerImpl<Integer>> + checkpointCommittables = + committableCollector.getCheckpointCommittables(); + assertThat(checkpointCommittables).hasSize(2); + + final Iterator<CheckpointCommittableManagerImpl<Integer>> committablesIterator = + checkpointCommittables.iterator(); + final CheckpointCommittableManagerImpl<Integer> checkpointCommittableManager1 = + committablesIterator.next(); + final SubtaskCommittableManager<Integer> subtaskCommittableManagerCheckpoint1 = + checkpointCommittableManager1.getSubtaskCommittableManager(subtaskId); + + SinkV2Assertions.assertThat(checkpointCommittableManager1.getSummary()) + .hasSubtaskId(subtaskId) + .hasNumberOfSubtasks(numberOfSubtasks); + assertThat( + subtaskCommittableManagerCheckpoint1 + .getPendingRequests() + .map(CommitRequestImpl::getCommittable) + .collect(Collectors.toList())) + .containsExactly(1); + assertThat(subtaskCommittableManagerCheckpoint1.getSubtaskId()) + .isEqualTo(subtaskId); + + final CheckpointCommittableManagerImpl<Integer> checkpointCommittableManager2 = + committablesIterator.next(); + final SubtaskCommittableManager<Integer> subtaskCommittableManagerCheckpoint2 = + checkpointCommittableManager2.getSubtaskCommittableManager(subtaskId); + + SinkV2Assertions.assertThat(checkpointCommittableManager2.getSummary()) + .hasSubtaskId(subtaskId) + .hasNumberOfSubtasks(numberOfSubtasks); + assertThat( + subtaskCommittableManagerCheckpoint2 + .getPendingRequests() + .map(CommitRequestImpl::getCommittable) + .collect(Collectors.toList())) + .containsExactly(2); + assertThat(subtaskCommittableManagerCheckpoint2.getSubtaskId()) + .isEqualTo(subtaskId); + }); } }