qiuyanjun888 commented on code in PR #28507:
URL: https://github.com/apache/flink/pull/28507#discussion_r3454471237
##########
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java:
##########
@@ -186,10 +187,30 @@ void writerAndCommitterExecuteInStreamingModeWithScaling(
runStreamingWithScalingTest(
config, initialParallelism, trackingCommitter, false,
miniCluster, clusterClient);
- assertThat(committed.get())
- .extracting(Committer.CommitRequest::getCommittable)
- .containsExactlyInAnyOrderElementsOf(
- duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE));
+ assertRecordsContainAtLeastExpected(
+ committed.get().stream()
+ .map(Committer.CommitRequest::getCommittable)
+ .collect(Collectors.toList()),
+ duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE));
+ }
+
+ @Test
+ void committedRecordAssertionAllowsAdditionalRetriedCommittables() {
+ final List<Record<Integer>> expected =
duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE);
+ final List<Record<Integer>> actual = new ArrayList<>(expected);
+ actual.add(expected.get(0));
+
+ assertRecordsContainAtLeastExpected(actual, expected);
+ }
+
+ private static void assertRecordsContainAtLeastExpected(
+ Collection<Record<Integer>> actual, List<Record<Integer>>
expected) {
+ final List<Record<Integer>> remainingActual = new ArrayList<>(actual);
+ for (Record<Integer> expectedRecord : expected) {
+ assertThat(remainingActual).contains(expectedRecord);
+ remainingActual.remove(expectedRecord);
+ }
+ assertThat(remainingActual).allSatisfy(record ->
assertThat(expected).contains(record));
Review Comment:
Fixed by applying the suggested `isSubsetOf(expected)` assertion for the
remaining actual records.
Verification:
- `git diff --check`
- `JAVA_HOME=/opt/data/jdks/jdk-17.0.19+10 ./mvnw -pl flink-tests
-Dtest=SinkV2ITCase#committedRecordAssertionAllowsAdditionalRetriedCommittables
test`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]