This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d718342ad0311ea7481d1a1bd87c395aeff25928
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Mon Feb 20 20:30:19 2023 +0000

    [hotfix][tests] Make LocalRecoveryITCase fail when allocations don't match
    
    Currently, wrong allocation fails the task causing a restart,
    which eventually allows to fix the allocation by picking the right TM.
    This prevents the test from failure and hides the wrong allocation.
---
 .../flink/test/recovery/LocalRecoveryITCase.java   | 28 ++++++++++++++++------
 1 file changed, 21 insertions(+), 7 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java
index 1639cda95b9..c8e505c9c9f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.test.recovery;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.configuration.CheckpointingOptions;
@@ -67,12 +68,14 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests local recovery by restarting Flink processes. */
 @ExtendWith(TestLoggerExtension.class)
 class LocalRecoveryITCase {
 
+    private static final String ALLOCATION_FAILURES_ACCUMULATOR_NAME = "acc";
+
     @TempDir private File tmpDirectory;
 
     @Test
@@ -108,7 +111,12 @@ class LocalRecoveryITCase {
 
             restartTaskManagerProcesses(taskManagerProcesses, parallelism - 1);
 
-            jobClient.getJobExecutionResult().get(waitingTimeInSeconds, 
TimeUnit.SECONDS);
+            List<String> allocFailures =
+                    jobClient
+                            .getJobExecutionResult()
+                            .get(waitingTimeInSeconds, TimeUnit.SECONDS)
+                            
.getAccumulatorResult(ALLOCATION_FAILURES_ACCUMULATOR_NAME);
+            assertTrue(allocFailures.isEmpty(), allocFailures.toString());
 
             success = true;
         } finally {
@@ -307,11 +315,17 @@ class LocalRecoveryITCase {
                                         new IllegalStateException(
                                                 "Could not find corresponding 
TaskNameAllocationID information."));
 
-                assertThat(myTaskNameAllocationId.getAllocationId())
-                        .withFailMessage(
-                                "The task was deployed to AllocationID(%s) but 
it should have been deployed to AllocationID(%s) for local recovery.",
-                                allocationId, 
myTaskNameAllocationId.getAllocationId())
-                        .isEqualTo(allocationId);
+                runtimeContext.addAccumulator(
+                        ALLOCATION_FAILURES_ACCUMULATOR_NAME, new 
ListAccumulator<String>());
+                if 
(!allocationId.equals(myTaskNameAllocationId.getAllocationId())) {
+                    runtimeContext
+                            
.getAccumulator(ALLOCATION_FAILURES_ACCUMULATOR_NAME)
+                            .add(
+                                    String.format(
+                                            "The task was deployed to 
AllocationID(%s) but it should have been deployed to AllocationID(%s) for local 
recovery.",
+                                            allocationId,
+                                            
myTaskNameAllocationId.getAllocationId()));
+                }
                 // terminate
                 running = false;
             }

Reply via email to