This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d718342ad0311ea7481d1a1bd87c395aeff25928 Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Mon Feb 20 20:30:19 2023 +0000 [hotfix][tests] Make LocalRecoveryITCase fail when allocations don't match Currently, wrong allocation fails the task causing a restart, which eventually allows to fix the allocation by picking the right TM. This prevents the test from failure and hides the wrong allocation. --- .../flink/test/recovery/LocalRecoveryITCase.java | 28 ++++++++++++++++------ 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java index 1639cda95b9..c8e505c9c9f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.test.recovery; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.ListAccumulator; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.configuration.CheckpointingOptions; @@ -67,12 +68,14 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; -import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; /** Tests local recovery by restarting Flink processes. */ @ExtendWith(TestLoggerExtension.class) class LocalRecoveryITCase { + private static final String ALLOCATION_FAILURES_ACCUMULATOR_NAME = "acc"; + @TempDir private File tmpDirectory; @Test @@ -108,7 +111,12 @@ class LocalRecoveryITCase { restartTaskManagerProcesses(taskManagerProcesses, parallelism - 1); - jobClient.getJobExecutionResult().get(waitingTimeInSeconds, TimeUnit.SECONDS); + List<String> allocFailures = + jobClient + .getJobExecutionResult() + .get(waitingTimeInSeconds, TimeUnit.SECONDS) + .getAccumulatorResult(ALLOCATION_FAILURES_ACCUMULATOR_NAME); + assertTrue(allocFailures.isEmpty(), allocFailures.toString()); success = true; } finally { @@ -307,11 +315,17 @@ class LocalRecoveryITCase { new IllegalStateException( "Could not find corresponding TaskNameAllocationID information.")); - assertThat(myTaskNameAllocationId.getAllocationId()) - .withFailMessage( - "The task was deployed to AllocationID(%s) but it should have been deployed to AllocationID(%s) for local recovery.", - allocationId, myTaskNameAllocationId.getAllocationId()) - .isEqualTo(allocationId); + runtimeContext.addAccumulator( + ALLOCATION_FAILURES_ACCUMULATOR_NAME, new ListAccumulator<String>()); + if (!allocationId.equals(myTaskNameAllocationId.getAllocationId())) { + runtimeContext + .getAccumulator(ALLOCATION_FAILURES_ACCUMULATOR_NAME) + .add( + String.format( + "The task was deployed to AllocationID(%s) but it should have been deployed to AllocationID(%s) for local recovery.", + allocationId, + myTaskNameAllocationId.getAllocationId())); + } // terminate running = false; }