This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 6b4882791dd [FLINK-29509][streaming] Set subtaskId and 
numberOfSubtasks on CheckpointCommittableManagerImpl during deserialization.
6b4882791dd is described below

commit 6b4882791dd0fd1b0df952ed7712ae7bd68adf36
Author: Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com>
AuthorDate: Mon Oct 10 16:34:36 2022 +0200

    [FLINK-29509][streaming] Set subtaskId and numberOfSubtasks on 
CheckpointCommittableManagerImpl during deserialization.
    
    When we recover the `CheckpointCommittableManager` we were ignoring the 
subtaskId it is recovered on. This becomes a problem when a sink uses a 
post-commit topology because multiple committer operators might forward 
committable summaries coming from the same subtaskId.
    
    This ticket implements a fix to use the subtaskId already present in the 
CommittableCollectorSerializer when recreating CheckpointCommittableManagerImpl 
during recovery.
    
    Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com>
---
 .../CheckpointCommittableManagerImpl.java          |  13 +--
 .../CommittableCollectorSerializer.java            |   2 +
 .../connector/sink2/CommittableSummaryAssert.java  |  13 +++
 .../operators/sink/CommitterOperatorTest.java      |  56 ++++++++++-
 .../CheckpointCommittableManagerImplTest.java      |  23 +++++
 .../CommittableCollectorSerializerTest.java        | 110 +++++++++++++++------
 6 files changed, 177 insertions(+), 40 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
index 4fd22f24ec0..b2a703c8db7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
@@ -45,18 +45,17 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
 
     CheckpointCommittableManagerImpl(
             int subtaskId, int numberOfSubtasks, @Nullable Long checkpointId) {
-        this.subtaskId = subtaskId;
-        this.numberOfSubtasks = numberOfSubtasks;
-        this.checkpointId = checkpointId;
-        this.subtasksCommittableManagers = new HashMap<>();
+        this(new HashMap<>(), subtaskId, numberOfSubtasks, checkpointId);
     }
 
     CheckpointCommittableManagerImpl(
             Map<Integer, SubtaskCommittableManager<CommT>> 
subtasksCommittableManagers,
+            int subtaskId,
+            int numberOfSubtasks,
             @Nullable Long checkpointId) {
         this.subtasksCommittableManagers = 
checkNotNull(subtasksCommittableManagers);
-        this.subtaskId = 0;
-        this.numberOfSubtasks = 1;
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
         this.checkpointId = checkpointId;
     }
 
@@ -158,6 +157,8 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
         return new CheckpointCommittableManagerImpl<>(
                 subtasksCommittableManagers.entrySet().stream()
                         .collect(Collectors.toMap(Map.Entry::getKey, (e) -> 
e.getValue().copy())),
+                subtaskId,
+                numberOfSubtasks,
                 checkpointId);
     }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
index 07947b98a06..6f8495b6637 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
@@ -152,6 +152,8 @@ public final class CommittableCollectorSerializer<CommT>
                             .collect(
                                     Collectors.toMap(
                                             
SubtaskCommittableManager::getSubtaskId, e -> e)),
+                    subtaskId,
+                    numberOfSubtasks,
                     checkpointId);
         }
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java
index bc84e3146ec..fb1062a5091 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java
@@ -36,6 +36,7 @@ public class CommittableSummaryAssert
         isNotNull();
         assertThat(actual.getSubtaskId()).isEqualTo(summary.getSubtaskId());
         
assertThat(actual.getCheckpointId()).isEqualTo(summary.getCheckpointId());
+        
assertThat(actual.getNumberOfSubtasks()).isEqualTo(summary.getNumberOfSubtasks());
         
assertThat(actual.getNumberOfCommittables()).isEqualTo(summary.getNumberOfCommittables());
         assertThat(actual.getNumberOfPendingCommittables())
                 .isEqualTo(summary.getNumberOfPendingCommittables());
@@ -44,6 +45,18 @@ public class CommittableSummaryAssert
         return this;
     }
 
+    public CommittableSummaryAssert hasSubtaskId(int subtaskId) {
+        isNotNull();
+        assertThat(actual.getSubtaskId()).isEqualTo(subtaskId);
+        return this;
+    }
+
+    public CommittableSummaryAssert hasNumberOfSubtasks(int numberOfSubtasks) {
+        isNotNull();
+        assertThat(actual.getNumberOfSubtasks()).isEqualTo(numberOfSubtasks);
+        return this;
+    }
+
     public CommittableSummaryAssert hasOverallCommittables(int 
committableNumber) {
         isNotNull();
         
assertThat(actual.getNumberOfCommittables()).isEqualTo(committableNumber);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
index 204d664247c..d7b0b550206 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
@@ -219,15 +219,27 @@ class CommitterOperatorTest {
 
     @Test
     void testStateRestore() throws Exception {
+
+        final int originalSubtaskId = 0;
+        final int subtaskIdAfterRecovery = 9;
+
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(new 
TestSink.RetryOnceCommitter(), false, true);
+                testHarness =
+                        createTestHarness(
+                                new TestSink.RetryOnceCommitter(),
+                                false,
+                                true,
+                                1,
+                                1,
+                                originalSubtaskId);
         testHarness.open();
 
         final CommittableSummary<String> committableSummary =
-                new CommittableSummary<>(1, 1, 0L, 1, 1, 0);
+                new CommittableSummary<>(originalSubtaskId, 1, 0L, 1, 1, 0);
         testHarness.processElement(new StreamRecord<>(committableSummary));
-        final CommittableWithLineage<String> first = new 
CommittableWithLineage<>("1", 0L, 1);
+        final CommittableWithLineage<String> first =
+                new CommittableWithLineage<>("1", 0L, originalSubtaskId);
         testHarness.processElement(new StreamRecord<>(first));
 
         final OperatorSubtaskState snapshot = testHarness.snapshot(0L, 2L);
@@ -239,9 +251,14 @@ class CommitterOperatorTest {
         testHarness.close();
 
         final ForwardingCommitter committer = new ForwardingCommitter();
+
+        // create new testHarness but with different parallelism level and 
subtaskId that original
+        // one.
+        // we will make sure that new subtaskId was used during committable 
recovery.
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
-                restored = createTestHarness(committer, false, true);
+                restored =
+                        createTestHarness(committer, false, true, 10, 10, 
subtaskIdAfterRecovery);
 
         restored.initializeState(snapshot);
         restored.open();
@@ -256,7 +273,9 @@ class CommitterOperatorTest {
                 .hasPendingCommittables(0);
 
         SinkV2Assertions.assertThat(toCommittableWithLinage(output.get(1)))
-                .isEqualTo(new 
CommittableWithLineage<>(first.getCommittable(), 1L, 0));
+                .isEqualTo(
+                        new CommittableWithLineage<>(
+                                first.getCommittable(), 1L, 
subtaskIdAfterRecovery));
         restored.close();
     }
 
@@ -344,6 +363,33 @@ class CommitterOperatorTest {
                         isCheckpointingEnabled));
     }
 
+    private OneInputStreamOperatorTestHarness<
+                    CommittableMessage<String>, CommittableMessage<String>>
+            createTestHarness(
+                    Committer<String> committer,
+                    boolean isBatchMode,
+                    boolean isCheckpointingEnabled,
+                    int maxParallelism,
+                    int parallelism,
+                    int subtaskId)
+                    throws Exception {
+        return new OneInputStreamOperatorTestHarness<>(
+                new CommitterOperatorFactory<>(
+                        (TwoPhaseCommittingSink<?, String>)
+                                TestSink.newBuilder()
+                                        .setCommitter(committer)
+                                        .setDefaultGlobalCommitter()
+                                        .setCommittableSerializer(
+                                                
TestSink.StringCommittableSerializer.INSTANCE)
+                                        .build()
+                                        .asV2(),
+                        isBatchMode,
+                        isCheckpointingEnabled),
+                maxParallelism,
+                parallelism,
+                subtaskId);
+    }
+
     private static class ForwardingCommitter extends TestSink.DefaultCommitter 
{
         private int successfulCommits = 0;
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
index e47c21dd09c..a4c8c0f3c44 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
@@ -21,8 +21,11 @@ package 
org.apache.flink.streaming.runtime.operators.sink.committables;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
 import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -93,6 +96,26 @@ class CheckpointCommittableManagerImplTest {
                 .hasMessageContaining("FLINK-25920");
     }
 
+    // check different values of subtaskId and numberOfSubtasks to make sure 
that no value is
+    // hardcoded.
+    @ParameterizedTest(name = "subtaskId = {0}, numberOfSubtasks = {1}, 
checkpointId = {2}")
+    @CsvSource({"1, 10, 100", "2, 20, 200", "3, 30, 300"})
+    public void testCopy(int subtaskId, int numberOfSubtasks, long 
checkpointId) {
+
+        final CheckpointCommittableManagerImpl<Integer> original =
+                new CheckpointCommittableManagerImpl<>(subtaskId, 
numberOfSubtasks, checkpointId);
+        original.upsertSummary(
+                new CommittableSummary<>(subtaskId, numberOfSubtasks, 
checkpointId, 1, 0, 0));
+
+        CheckpointCommittableManagerImpl<Integer> copy = original.copy();
+
+        assertThat(copy.getCheckpointId()).isEqualTo(checkpointId);
+        SinkV2Assertions.assertThat(copy.getSummary())
+                .hasNumberOfSubtasks(numberOfSubtasks)
+                .hasSubtaskId(subtaskId)
+                .hasCheckpointId(checkpointId);
+    }
+
     private static class NoOpCommitter implements Committer<Integer> {
 
         @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java
index b12fed07b75..af9e4070d61 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
 import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 import org.apache.flink.streaming.api.connector.sink2.IntegerSerializer;
+import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
 
 import org.junit.jupiter.api.Test;
 
@@ -35,6 +36,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertAll;
 
 class CommittableCollectorSerializerTest {
 
@@ -72,38 +74,88 @@ class CommittableCollectorSerializerTest {
 
     @Test
     void testCommittableCollectorV2SerDe() throws IOException {
-        final CommittableCollector<Integer> committableCollector = new 
CommittableCollector<>(2, 3);
-        committableCollector.addMessage(new CommittableSummary<>(2, 3, 1L, 1, 
1, 0));
-        committableCollector.addMessage(new CommittableSummary<>(2, 3, 2L, 1, 
1, 0));
-        committableCollector.addMessage(new CommittableWithLineage<>(1, 1L, 
2));
-        committableCollector.addMessage(new CommittableWithLineage<>(2, 2L, 
2));
+        int subtaskId = 2;
+        int numberOfSubtasks = 3;
+
+        final CommittableCollectorSerializer<Integer> ccSerializer =
+                new CommittableCollectorSerializer<>(
+                        COMMITTABLE_SERIALIZER, subtaskId, numberOfSubtasks);
+
+        final CommittableCollector<Integer> committableCollector =
+                new CommittableCollector<>(subtaskId, numberOfSubtasks);
+        committableCollector.addMessage(
+                new CommittableSummary<>(subtaskId, numberOfSubtasks, 1L, 1, 
1, 0));
+        committableCollector.addMessage(
+                new CommittableSummary<>(subtaskId, numberOfSubtasks, 2L, 1, 
1, 0));
+        committableCollector.addMessage(new CommittableWithLineage<>(1, 1L, 
subtaskId));
+        committableCollector.addMessage(new CommittableWithLineage<>(2, 2L, 
subtaskId));
+
         final CommittableCollector<Integer> copy =
-                SERIALIZER.deserialize(2, 
SERIALIZER.serialize(committableCollector));
+                ccSerializer.deserialize(2, 
SERIALIZER.serialize(committableCollector));
 
         // Expect the subtask Id equal to the origin of the collector
-        assertThat(copy.getSubtaskId()).isEqualTo(1);
+        assertThat(copy.getSubtaskId()).isEqualTo(subtaskId);
         assertThat(copy.isFinished()).isFalse();
-        assertThat(copy.getNumberOfSubtasks()).isEqualTo(1);
-        final Collection<CheckpointCommittableManagerImpl<Integer>> 
checkpointCommittables =
-                committableCollector.getCheckpointCommittables();
-        assertThat(checkpointCommittables).hasSize(2);
-        final Iterator<CheckpointCommittableManagerImpl<Integer>> 
committablesIterator =
-                checkpointCommittables.iterator();
-        final SubtaskCommittableManager<Integer> 
subtaskCommittableManagerCheckpoint1 =
-                committablesIterator.next().getSubtaskCommittableManager(2);
-        assertThat(
-                        subtaskCommittableManagerCheckpoint1
-                                .getPendingRequests()
-                                .map(CommitRequestImpl::getCommittable)
-                                .collect(Collectors.toList()))
-                .containsExactly(1);
-        final SubtaskCommittableManager<Integer> 
subtaskCommittableManagerCheckpoint2 =
-                committablesIterator.next().getSubtaskCommittableManager(2);
-        assertThat(
-                        subtaskCommittableManagerCheckpoint2
-                                .getPendingRequests()
-                                .map(CommitRequestImpl::getCommittable)
-                                .collect(Collectors.toList()))
-                .containsExactly(2);
+        assertThat(copy.getNumberOfSubtasks()).isEqualTo(numberOfSubtasks);
+
+        // assert original CommittableCollector
+        assertCommittableCollector(
+                "Original CommittableCollector", subtaskId, numberOfSubtasks, 
committableCollector);
+
+        // assert deserialized CommittableCollector
+        assertCommittableCollector(
+                "Deserialized CommittableCollector", subtaskId, 
numberOfSubtasks, copy);
+    }
+
+    private void assertCommittableCollector(
+            String assertMessageHeading,
+            int subtaskId,
+            int numberOfSubtasks,
+            CommittableCollector<Integer> committableCollector) {
+
+        assertAll(
+                assertMessageHeading,
+                () -> {
+                    final Collection<CheckpointCommittableManagerImpl<Integer>>
+                            checkpointCommittables =
+                                    
committableCollector.getCheckpointCommittables();
+                    assertThat(checkpointCommittables).hasSize(2);
+
+                    final Iterator<CheckpointCommittableManagerImpl<Integer>> 
committablesIterator =
+                            checkpointCommittables.iterator();
+                    final CheckpointCommittableManagerImpl<Integer> 
checkpointCommittableManager1 =
+                            committablesIterator.next();
+                    final SubtaskCommittableManager<Integer> 
subtaskCommittableManagerCheckpoint1 =
+                            
checkpointCommittableManager1.getSubtaskCommittableManager(subtaskId);
+
+                    
SinkV2Assertions.assertThat(checkpointCommittableManager1.getSummary())
+                            .hasSubtaskId(subtaskId)
+                            .hasNumberOfSubtasks(numberOfSubtasks);
+                    assertThat(
+                                    subtaskCommittableManagerCheckpoint1
+                                            .getPendingRequests()
+                                            
.map(CommitRequestImpl::getCommittable)
+                                            .collect(Collectors.toList()))
+                            .containsExactly(1);
+                    
assertThat(subtaskCommittableManagerCheckpoint1.getSubtaskId())
+                            .isEqualTo(subtaskId);
+
+                    final CheckpointCommittableManagerImpl<Integer> 
checkpointCommittableManager2 =
+                            committablesIterator.next();
+                    final SubtaskCommittableManager<Integer> 
subtaskCommittableManagerCheckpoint2 =
+                            
checkpointCommittableManager2.getSubtaskCommittableManager(subtaskId);
+
+                    
SinkV2Assertions.assertThat(checkpointCommittableManager2.getSummary())
+                            .hasSubtaskId(subtaskId)
+                            .hasNumberOfSubtasks(numberOfSubtasks);
+                    assertThat(
+                                    subtaskCommittableManagerCheckpoint2
+                                            .getPendingRequests()
+                                            
.map(CommitRequestImpl::getCommittable)
+                                            .collect(Collectors.toList()))
+                            .containsExactly(2);
+                    
assertThat(subtaskCommittableManagerCheckpoint2.getSubtaskId())
+                            .isEqualTo(subtaskId);
+                });
     }
 }

Reply via email to