AHeise commented on code in PR #26518:
URL: https://github.com/apache/flink/pull/26518#discussion_r2072904649
##########
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java:
##########
@@ -125,6 +151,45 @@ private static Record<Integer> flipValue(Record<Integer>
r) {
return r.withValue(-r.getValue());
}
+ @ParameterizedTest
+ @MethodSource("getScalingParallelism")
+ public void writerAndCommitterExecuteInStreamingModeWithScalingUp(
Review Comment:
Test looks great but it doesn't fail on master. I'll comment inline what
needs to be adjusted.
2 Nits:
* Please use CsvSource
* Please rename to ...WithRescaling
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java:
##########
@@ -165,80 +165,21 @@ void ensureAllCommittablesArrivedBeforeCommitting()
throws Exception {
}
@Test
- void testStateRestore() throws Exception {
+ void testStateRestoreForScalingUp() throws Exception {
- final int originalSubtaskId = 0;
- final int subtaskIdAfterRecovery = 9;
-
- final OneInputStreamOperatorTestHarness<
- CommittableMessage<String>, CommittableMessage<String>>
- testHarness =
- createTestHarness(
- sinkWithPostCommitWithRetry().sink,
- false,
- true,
- 1,
- 1,
- originalSubtaskId);
- testHarness.open();
-
- // We cannot test a different checkpoint thant 0 because when using
the OperatorTestHarness
- // for recovery the lastCompleted checkpoint is always reset to 0.
- long checkpointId = 0L;
-
- final CommittableSummary<String> committableSummary =
- new CommittableSummary<>(originalSubtaskId, 1, checkpointId,
1, 0);
- testHarness.processElement(new StreamRecord<>(committableSummary));
- final CommittableWithLineage<String> first =
- new CommittableWithLineage<>("1", checkpointId,
originalSubtaskId);
- testHarness.processElement(new StreamRecord<>(first));
-
- // another committable for the same checkpointId but from different
subtask.
- final CommittableSummary<String> committableSummary2 =
- new CommittableSummary<>(originalSubtaskId + 1, 1,
checkpointId, 1, 0);
- testHarness.processElement(new StreamRecord<>(committableSummary2));
- final CommittableWithLineage<String> second =
- new CommittableWithLineage<>("2", checkpointId,
originalSubtaskId + 1);
- testHarness.processElement(new StreamRecord<>(second));
-
- final OperatorSubtaskState snapshot =
testHarness.snapshot(checkpointId, 2L);
- assertThat(testHarness.getOutput()).isEmpty();
- testHarness.close();
+ testStateRestore(1, 10, 9);
+ }
Review Comment:
Could we instead use a parameterized test just like in your IT? I prefer
understanding a test without needing to hop around in the test code.
##########
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java:
##########
@@ -125,6 +151,45 @@ private static Record<Integer> flipValue(Record<Integer>
r) {
return r.withValue(-r.getValue());
}
+ @ParameterizedTest
+ @MethodSource("getScalingParallelism")
+ public void writerAndCommitterExecuteInStreamingModeWithScalingUp(
+ int initialParallelism,
+ int scaledParallelism,
+ @TempDir File checkpointDir,
+ @InjectMiniCluster MiniCluster miniCluster,
+ @InjectClusterClient ClusterClient<?> clusterClient)
+ throws Exception {
+ SharedReference<Queue<Committer.CommitRequest<Record<Integer>>>>
committed =
+ SHARED_OBJECTS.add(new ConcurrentLinkedQueue<>());
+ final TrackingCommitter trackingCommitter = new
TrackingCommitter(committed);
+ final Configuration config = createConfigForScalingTest(checkpointDir,
initialParallelism);
+
+ // first run
+ final JobID jobID =
+ runStreamingWithScalingTest(
+ config,
+ initialParallelism,
+ trackingCommitter,
+ true,
+ miniCluster,
+ clusterClient);
+
+ // second run
+ config.set(StateRecoveryOptions.SAVEPOINT_PATH,
getCheckpointPath(miniCluster, jobID));
+ config.set(CoreOptions.DEFAULT_PARALLELISM, scaledParallelism);
+ runStreamingWithScalingTest(
+ config, initialParallelism, trackingCommitter, false,
miniCluster, clusterClient);
+
+ assertThat(committed.get())
+ .extracting(Committer.CommitRequest::getCommittable)
+
.containsExactlyInAnyOrderElementsOf(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE);
+ }
+
Review Comment:
private static List<Record<Integer>> duplicate(List<Record<Integer>>
values) {
return IntStream.range(0, 2)
.boxed()
.flatMap(i -> values.stream())
.collect(Collectors.toList());
}
```suggestion
```
##########
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java:
##########
@@ -184,6 +249,71 @@ private StreamExecutionEnvironment buildStreamEnv() {
return env;
}
+ private Configuration createConfigForScalingTest(File checkpointDir, int
parallelism) {
+ final Configuration config = new Configuration();
+ config.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
+ config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
+ config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointDir.toURI().toString());
+ config.set(
+ CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION,
+ ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
+ config.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 2000);
+ config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+
+ return config;
+ }
+
+ private StreamExecutionEnvironment buildStreamEnvWithCheckpointDir(
+ Configuration config, int parallelism, MiniCluster miniCluster) {
+ final StreamExecutionEnvironment env =
+ new TestStreamEnvironment(
+ miniCluster,
+ config,
+ parallelism,
+ Collections.emptyList(),
+ Collections.emptyList());
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ env.enableCheckpointing(100);
+
+ return env;
+ }
+
+ private JobID runStreamingWithScalingTest(
+ Configuration config,
+ int parallelism,
+ TrackingCommitter trackingCommitter,
+ boolean shouldMapperFail,
+ MiniCluster miniCluster,
+ ClusterClient<?> clusterClient)
+ throws Exception {
+ final StreamExecutionEnvironment env =
+ buildStreamEnvWithCheckpointDir(config, parallelism,
miniCluster);
+ final Source<Integer, ?, ?> source = createStreamingSource();
+
+ env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
+ .map(
+ new FailingCheckpointMapper(
+ SHARED_OBJECTS.add(new
AtomicBoolean(shouldMapperFail))))
+ .sinkTo(
+ TestSinkV2.<Integer>newBuilder()
+ .setCommitter(trackingCommitter,
RecordSerializer::new)
+ .build());
Review Comment:
This will enable global committer. So now we expect output twice!
##########
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java:
##########
@@ -125,6 +151,45 @@ private static Record<Integer> flipValue(Record<Integer>
r) {
return r.withValue(-r.getValue());
}
+ @ParameterizedTest
+ @MethodSource("getScalingParallelism")
+ public void writerAndCommitterExecuteInStreamingModeWithScalingUp(
+ int initialParallelism,
+ int scaledParallelism,
+ @TempDir File checkpointDir,
+ @InjectMiniCluster MiniCluster miniCluster,
+ @InjectClusterClient ClusterClient<?> clusterClient)
+ throws Exception {
+ SharedReference<Queue<Committer.CommitRequest<Record<Integer>>>>
committed =
+ SHARED_OBJECTS.add(new ConcurrentLinkedQueue<>());
+ final TrackingCommitter trackingCommitter = new
TrackingCommitter(committed);
+ final Configuration config = createConfigForScalingTest(checkpointDir,
initialParallelism);
+
+ // first run
+ final JobID jobID =
+ runStreamingWithScalingTest(
+ config,
+ initialParallelism,
+ trackingCommitter,
+ true,
+ miniCluster,
+ clusterClient);
+
+ // second run
+ config.set(StateRecoveryOptions.SAVEPOINT_PATH,
getCheckpointPath(miniCluster, jobID));
+ config.set(CoreOptions.DEFAULT_PARALLELISM, scaledParallelism);
+ runStreamingWithScalingTest(
+ config, initialParallelism, trackingCommitter, false,
miniCluster, clusterClient);
+
+ assertThat(committed.get())
+ .extracting(Committer.CommitRequest::getCommittable)
+
.containsExactlyInAnyOrderElementsOf(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE);
Review Comment:
```suggestion
.containsExactlyInAnyOrderElementsOf(
duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE));
```
##########
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java:
##########
@@ -184,6 +249,71 @@ private StreamExecutionEnvironment buildStreamEnv() {
return env;
}
+ private Configuration createConfigForScalingTest(File checkpointDir, int
parallelism) {
+ final Configuration config = new Configuration();
+ config.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
+ config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
+ config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointDir.toURI().toString());
+ config.set(
+ CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION,
+ ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
+ config.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 2000);
+ config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+
+ return config;
+ }
+
+ private StreamExecutionEnvironment buildStreamEnvWithCheckpointDir(
+ Configuration config, int parallelism, MiniCluster miniCluster) {
+ final StreamExecutionEnvironment env =
+ new TestStreamEnvironment(
+ miniCluster,
+ config,
+ parallelism,
+ Collections.emptyList(),
+ Collections.emptyList());
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ env.enableCheckpointing(100);
+
+ return env;
+ }
+
+ private JobID runStreamingWithScalingTest(
+ Configuration config,
+ int parallelism,
+ TrackingCommitter trackingCommitter,
+ boolean shouldMapperFail,
+ MiniCluster miniCluster,
+ ClusterClient<?> clusterClient)
+ throws Exception {
+ final StreamExecutionEnvironment env =
+ buildStreamEnvWithCheckpointDir(config, parallelism,
miniCluster);
+ final Source<Integer, ?, ?> source = createStreamingSource();
+
+ env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
+ .map(
+ new FailingCheckpointMapper(
+ SHARED_OBJECTS.add(new
AtomicBoolean(shouldMapperFail))))
Review Comment:
```suggestion
SHARED_OBJECTS.add(new
AtomicBoolean(!shouldMapperFail))))
```
##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java:
##########
@@ -183,7 +183,10 @@ private void
commitAndEmit(CheckpointCommittableManager<CommT> committableManage
private void emit(CheckpointCommittableManager<CommT> committableManager) {
int subtaskId =
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
- int numberOfSubtasks =
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
+ int numberOfSubtasks =
Review Comment:
Could you please add a comment explaining the change in 1-2 lines (e.g.
"Ensure that numberOfSubtasks is in sync with the number of actually emitted
CommittableSummaries during upscaling recovery (see FLINK-37747).").
##########
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java:
##########
@@ -184,6 +249,71 @@ private StreamExecutionEnvironment buildStreamEnv() {
return env;
}
+ private Configuration createConfigForScalingTest(File checkpointDir, int
parallelism) {
+ final Configuration config = new Configuration();
+ config.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
+ config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
+ config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointDir.toURI().toString());
+ config.set(
+ CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION,
+ ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
+ config.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 2000);
+ config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+
+ return config;
+ }
+
+ private StreamExecutionEnvironment buildStreamEnvWithCheckpointDir(
+ Configuration config, int parallelism, MiniCluster miniCluster) {
+ final StreamExecutionEnvironment env =
+ new TestStreamEnvironment(
+ miniCluster,
+ config,
+ parallelism,
+ Collections.emptyList(),
+ Collections.emptyList());
Review Comment:
```suggestion
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
```
##########
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java:
##########
@@ -184,6 +249,71 @@ private StreamExecutionEnvironment buildStreamEnv() {
return env;
}
+ private Configuration createConfigForScalingTest(File checkpointDir, int
parallelism) {
+ final Configuration config = new Configuration();
+ config.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
+ config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
+ config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointDir.toURI().toString());
+ config.set(
+ CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION,
+ ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
+ config.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 2000);
+ config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+
+ return config;
+ }
+
+ private StreamExecutionEnvironment buildStreamEnvWithCheckpointDir(
+ Configuration config, int parallelism, MiniCluster miniCluster) {
+ final StreamExecutionEnvironment env =
+ new TestStreamEnvironment(
+ miniCluster,
+ config,
+ parallelism,
+ Collections.emptyList(),
+ Collections.emptyList());
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ env.enableCheckpointing(100);
+
+ return env;
+ }
+
+ private JobID runStreamingWithScalingTest(
+ Configuration config,
+ int parallelism,
+ TrackingCommitter trackingCommitter,
+ boolean shouldMapperFail,
+ MiniCluster miniCluster,
+ ClusterClient<?> clusterClient)
+ throws Exception {
+ final StreamExecutionEnvironment env =
+ buildStreamEnvWithCheckpointDir(config, parallelism,
miniCluster);
+ final Source<Integer, ?, ?> source = createStreamingSource();
+
+ env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
+ .map(
+ new FailingCheckpointMapper(
+ SHARED_OBJECTS.add(new
AtomicBoolean(shouldMapperFail))))
+ .sinkTo(
+ TestSinkV2.<Integer>newBuilder()
+ .setCommitter(trackingCommitter,
RecordSerializer::new)
+ .build());
Review Comment:
```suggestion
.setWithPostCommitTopology(true)
.build());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]