1996fanrui commented on code in PR #23227:
URL: https://github.com/apache/flink/pull/23227#discussion_r1298640863


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java:
##########
@@ -481,63 +480,61 @@ public static ExecutionAttemptID createExecutionAttemptId(
      * @param outputJobVertices downstream vertices of the verified vertex, 
used to check produced
      *     data sets of generated vertex
      */
-    public static void verifyGeneratedExecutionJobVertex(
+    static void verifyGeneratedExecutionJobVertex(
             ExecutionGraph executionGraph,
             JobVertex originJobVertex,
             @Nullable List<JobVertex> inputJobVertices,
             @Nullable List<JobVertex> outputJobVertices) {
 
         ExecutionJobVertex ejv = 
executionGraph.getAllVertices().get(originJobVertex.getID());
-        assertNotNull(ejv);
+        assertThat(ejv).isNotNull();
 
         // verify basic properties
-        assertEquals(originJobVertex.getParallelism(), ejv.getParallelism());
-        assertEquals(executionGraph.getJobID(), ejv.getJobId());
-        assertEquals(originJobVertex.getID(), ejv.getJobVertexId());
-        assertEquals(originJobVertex, ejv.getJobVertex());
+        
assertThat(originJobVertex.getParallelism()).isEqualTo(ejv.getParallelism());
+        assertThat(executionGraph.getJobID()).isEqualTo(ejv.getJobId());
+        assertThat(originJobVertex.getID()).isEqualTo(ejv.getJobVertexId());
+        assertThat(originJobVertex).isEqualTo(ejv.getJobVertex());
 
         // verify produced data sets
         if (outputJobVertices == null) {
-            assertEquals(0, ejv.getProducedDataSets().length);
+            assertThat(ejv.getProducedDataSets().length).isZero();
         } else {
-            assertEquals(outputJobVertices.size(), 
ejv.getProducedDataSets().length);
+            
assertThat(outputJobVertices.size()).isEqualTo(ejv.getProducedDataSets().length);

Review Comment:
   ```suggestion
               
assertThat(outputJobVertices).hasSize(ejv.getProducedDataSets().length);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java:
##########
@@ -221,7 +221,7 @@ void testBuildDeploymentDescriptor() throws Exception {
         Collection<InputGateDeploymentDescriptor> consumedPartitions = 
descr.getInputGates();
 
         assertThat(producedPartitions.size()).isEqualTo(2);
-        assertThat(consumedPartitions.size()).isEqualTo(1);
+        assertThat(consumedPartitions.size()).isOne();

Review Comment:
   ```suggestion
           assertThat(consumedPartitions).hasSize(1);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java:
##########
@@ -481,63 +480,61 @@ public static ExecutionAttemptID createExecutionAttemptId(
      * @param outputJobVertices downstream vertices of the verified vertex, 
used to check produced
      *     data sets of generated vertex
      */
-    public static void verifyGeneratedExecutionJobVertex(
+    static void verifyGeneratedExecutionJobVertex(
             ExecutionGraph executionGraph,
             JobVertex originJobVertex,
             @Nullable List<JobVertex> inputJobVertices,
             @Nullable List<JobVertex> outputJobVertices) {
 
         ExecutionJobVertex ejv = 
executionGraph.getAllVertices().get(originJobVertex.getID());
-        assertNotNull(ejv);
+        assertThat(ejv).isNotNull();
 
         // verify basic properties
-        assertEquals(originJobVertex.getParallelism(), ejv.getParallelism());
-        assertEquals(executionGraph.getJobID(), ejv.getJobId());
-        assertEquals(originJobVertex.getID(), ejv.getJobVertexId());
-        assertEquals(originJobVertex, ejv.getJobVertex());
+        
assertThat(originJobVertex.getParallelism()).isEqualTo(ejv.getParallelism());
+        assertThat(executionGraph.getJobID()).isEqualTo(ejv.getJobId());
+        assertThat(originJobVertex.getID()).isEqualTo(ejv.getJobVertexId());
+        assertThat(originJobVertex).isEqualTo(ejv.getJobVertex());
 
         // verify produced data sets
         if (outputJobVertices == null) {
-            assertEquals(0, ejv.getProducedDataSets().length);
+            assertThat(ejv.getProducedDataSets().length).isZero();
         } else {
-            assertEquals(outputJobVertices.size(), 
ejv.getProducedDataSets().length);
+            
assertThat(outputJobVertices.size()).isEqualTo(ejv.getProducedDataSets().length);
             for (int i = 0; i < outputJobVertices.size(); i++) {
-                assertEquals(
-                        originJobVertex.getProducedDataSets().get(i).getId(),
-                        ejv.getProducedDataSets()[i].getId());
-                assertEquals(
-                        originJobVertex.getParallelism(),
-                        ejv.getProducedDataSets()[0].getPartitions().length);
+                
assertThat(originJobVertex.getProducedDataSets().get(i).getId())
+                        .isEqualTo(ejv.getProducedDataSets()[i].getId());
+                assertThat(originJobVertex.getParallelism())
+                        
.isEqualTo(ejv.getProducedDataSets()[0].getPartitions().length);
             }
         }
 
         // verify task vertices for their basic properties and their inputs
-        assertEquals(originJobVertex.getParallelism(), 
ejv.getTaskVertices().length);
+        
assertThat(originJobVertex.getParallelism()).isEqualTo(ejv.getTaskVertices().length);
 
         int subtaskIndex = 0;
         for (ExecutionVertex ev : ejv.getTaskVertices()) {
-            assertEquals(executionGraph.getJobID(), ev.getJobId());
-            assertEquals(originJobVertex.getID(), ev.getJobvertexId());
+            assertThat(executionGraph.getJobID()).isEqualTo(ev.getJobId());
+            assertThat(originJobVertex.getID()).isEqualTo(ev.getJobvertexId());
 
-            assertEquals(originJobVertex.getParallelism(), 
ev.getTotalNumberOfParallelSubtasks());
-            assertEquals(subtaskIndex, ev.getParallelSubtaskIndex());
+            assertThat(originJobVertex.getParallelism())
+                    .isEqualTo(ev.getTotalNumberOfParallelSubtasks());
+            assertThat(subtaskIndex).isEqualTo(ev.getParallelSubtaskIndex());
 
             if (inputJobVertices == null) {
-                assertEquals(0, ev.getNumberOfInputs());
+                assertThat(ev.getNumberOfInputs()).isZero();
             } else {
-                assertEquals(inputJobVertices.size(), ev.getNumberOfInputs());
+                
assertThat(inputJobVertices.size()).isEqualTo(ev.getNumberOfInputs());

Review Comment:
   ```suggestion
                   assertThat(inputJobVertices).hasSize(ev.getNumberOfInputs());
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java:
##########
@@ -375,13 +369,13 @@ public void testFailExecutionAfterCancel() throws 
Exception {
                 v.getCurrentExecutionAttempt().fail(new Exception("Test 
Exception"));
             }
 
-            assertEquals(JobStatus.CANCELED, eg.getTerminationFuture().get());
+            
assertThat(eg.getTerminationFuture().get()).isEqualTo(JobStatus.CANCELED);

Review Comment:
   ```suggestion
               
FlinkAssertions.assertThatFuture(eg.getTerminationFuture()).eventuallySucceeds().isEqualTo(JobStatus.CANCELED);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java:
##########
@@ -257,23 +254,24 @@ public void testSendCancelAndReceiveFail() throws 
Exception {
         scheduler.startScheduling();
 
         ExecutionGraphTestUtils.switchAllVerticesToRunning(graph);
-        assertEquals(JobStatus.RUNNING, graph.getState());
+        assertThat(graph.getState()).isEqualTo(JobStatus.RUNNING);
 
         final ExecutionVertex[] vertices =
                 
graph.getVerticesTopologically().iterator().next().getTaskVertices();
-        assertEquals(vertices.length, graph.getRegisteredExecutions().size());
+        
assertThat(graph.getRegisteredExecutions().size()).isEqualTo(vertices.length);

Review Comment:
   ```suggestion
           assertThat(graph.getRegisteredExecutions()).hasSize(vertices.length);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java:
##########
@@ -257,23 +254,24 @@ public void testSendCancelAndReceiveFail() throws 
Exception {
         scheduler.startScheduling();
 
         ExecutionGraphTestUtils.switchAllVerticesToRunning(graph);
-        assertEquals(JobStatus.RUNNING, graph.getState());
+        assertThat(graph.getState()).isEqualTo(JobStatus.RUNNING);
 
         final ExecutionVertex[] vertices =
                 
graph.getVerticesTopologically().iterator().next().getTaskVertices();
-        assertEquals(vertices.length, graph.getRegisteredExecutions().size());
+        
assertThat(graph.getRegisteredExecutions().size()).isEqualTo(vertices.length);
 
         final Execution exec = vertices[3].getCurrentExecutionAttempt();
         exec.cancel();
-        assertEquals(ExecutionState.CANCELING, exec.getState());
+        assertThat(exec.getState()).isEqualTo(ExecutionState.CANCELING);
 
         exec.markFailed(new Exception("test"));
-        assertTrue(
-                exec.getState() == ExecutionState.FAILED
-                        || exec.getState() == ExecutionState.CANCELED);
+        assertThat(
+                        exec.getState() == ExecutionState.FAILED
+                                || exec.getState() == ExecutionState.CANCELED)
+                .isTrue();
 
-        assertFalse(exec.getAssignedResource().isAlive());
-        assertEquals(vertices.length - 1, 
graph.getRegisteredExecutions().size());
+        assertThat(exec.getAssignedResource().isAlive()).isFalse();
+        
assertThat(graph.getRegisteredExecutions().size()).isEqualTo(vertices.length - 
1);

Review Comment:
   ```suggestion
           assertThat(graph.getRegisteredExecutions()).hasSize(vertices.length 
- 1);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/LogicalPipelinedRegionComputeUtilTest.java:
##########
@@ -206,7 +205,9 @@ private static Set<Set<LogicalVertex>> 
computePipelinedRegions(JobVertex... vert
 
     private static void checkRegionSize(
             Set<Set<LogicalVertex>> regions, int numOfRegions, int... sizes) {
-        assertEquals(numOfRegions, regions.size());
-        
containsInAnyOrder(regions.stream().map(Set::size).collect(Collectors.toList()),
 sizes);
+        assertThat(regions.size()).isEqualTo(numOfRegions);

Review Comment:
   ```suggestion
           assertThat(regions).hasSize(numOfRegions);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java:
##########
@@ -224,35 +217,35 @@ private void testPartitionTrackingForStateTransition(
 
         Tuple2<ResourceID, ResultPartitionDeploymentDescriptor> 
startTrackingCall =
                 partitionStartTrackingFuture.get();
-        assertThat(startTrackingCall.f0, equalTo(taskExecutorResourceId));
-        assertThat(startTrackingCall.f1, equalTo(descriptor));
+        assertThat(startTrackingCall.f0).isEqualTo(taskExecutorResourceId);
+        assertThat(startTrackingCall.f1).isEqualTo(descriptor);
 
         stateTransition.accept(execution);
 
         switch (partitionReleaseResult) {
             case NONE:
-                assertFalse(partitionStopTrackingFuture.isDone());
-                assertFalse(partitionStopTrackingAndReleaseFuture.isDone());
+                assertThat(partitionStopTrackingFuture.isDone()).isFalse();
+                
assertThat(partitionStopTrackingAndReleaseFuture.isDone()).isFalse();

Review Comment:
   ```suggestion
                   assertThat(partitionStopTrackingFuture).isNotDone();
                   
assertThat(partitionStopTrackingAndReleaseFuture).isNotDone();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java:
##########
@@ -224,35 +217,35 @@ private void testPartitionTrackingForStateTransition(
 
         Tuple2<ResourceID, ResultPartitionDeploymentDescriptor> 
startTrackingCall =
                 partitionStartTrackingFuture.get();
-        assertThat(startTrackingCall.f0, equalTo(taskExecutorResourceId));
-        assertThat(startTrackingCall.f1, equalTo(descriptor));
+        assertThat(startTrackingCall.f0).isEqualTo(taskExecutorResourceId);
+        assertThat(startTrackingCall.f1).isEqualTo(descriptor);
 
         stateTransition.accept(execution);
 
         switch (partitionReleaseResult) {
             case NONE:
-                assertFalse(partitionStopTrackingFuture.isDone());
-                assertFalse(partitionStopTrackingAndReleaseFuture.isDone());
+                assertThat(partitionStopTrackingFuture.isDone()).isFalse();
+                
assertThat(partitionStopTrackingAndReleaseFuture.isDone()).isFalse();
                 break;
             case STOP_TRACKING:
-                assertTrue(partitionStopTrackingFuture.isDone());
-                assertFalse(partitionStopTrackingAndReleaseFuture.isDone());
+                assertThat(partitionStopTrackingFuture.isDone()).isTrue();
+                
assertThat(partitionStopTrackingAndReleaseFuture.isDone()).isFalse();

Review Comment:
   ```suggestion
                   assertThat(partitionStopTrackingFuture).isDone();
                   assertThat(partitionStopTrackingAndReleaseFuture).isDone();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java:
##########
@@ -106,7 +101,7 @@ public void 
testTerminationFutureIsCompletedAfterSlotRelease() throws Exception
         CompletableFuture<Boolean> restartFuture =
                 terminationFuture.thenApply(
                         ignored -> {
-                            assertTrue(returnedSlotFuture.isDone());
+                            assertThat(returnedSlotFuture.isDone()).isTrue();

Review Comment:
   ```suggestion
                               assertThat(returnedSlotFuture).isDone();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java:
##########
@@ -224,35 +217,35 @@ private void testPartitionTrackingForStateTransition(
 
         Tuple2<ResourceID, ResultPartitionDeploymentDescriptor> 
startTrackingCall =
                 partitionStartTrackingFuture.get();
-        assertThat(startTrackingCall.f0, equalTo(taskExecutorResourceId));
-        assertThat(startTrackingCall.f1, equalTo(descriptor));
+        assertThat(startTrackingCall.f0).isEqualTo(taskExecutorResourceId);
+        assertThat(startTrackingCall.f1).isEqualTo(descriptor);
 
         stateTransition.accept(execution);
 
         switch (partitionReleaseResult) {
             case NONE:
-                assertFalse(partitionStopTrackingFuture.isDone());
-                assertFalse(partitionStopTrackingAndReleaseFuture.isDone());
+                assertThat(partitionStopTrackingFuture.isDone()).isFalse();
+                
assertThat(partitionStopTrackingAndReleaseFuture.isDone()).isFalse();
                 break;
             case STOP_TRACKING:
-                assertTrue(partitionStopTrackingFuture.isDone());
-                assertFalse(partitionStopTrackingAndReleaseFuture.isDone());
+                assertThat(partitionStopTrackingFuture.isDone()).isTrue();
+                
assertThat(partitionStopTrackingAndReleaseFuture.isDone()).isFalse();
                 final Collection<ResultPartitionID> stopTrackingCall =
                         partitionStopTrackingFuture.get();
-                assertEquals(
-                        Collections.singletonList(
-                                
descriptor.getShuffleDescriptor().getResultPartitionID()),
-                        stopTrackingCall);
+                assertThat(
+                                Collections.singletonList(
+                                        
descriptor.getShuffleDescriptor().getResultPartitionID()))
+                        .isEqualTo(stopTrackingCall);
                 break;
             case STOP_TRACKING_AND_RELEASE:
-                assertFalse(partitionStopTrackingFuture.isDone());
-                assertTrue(partitionStopTrackingAndReleaseFuture.isDone());
+                assertThat(partitionStopTrackingFuture.isDone()).isFalse();
+                
assertThat(partitionStopTrackingAndReleaseFuture.isDone()).isTrue();

Review Comment:
   ```suggestion
                   assertThat(partitionStopTrackingFuture).isNotDone();
                   
assertThat(partitionStopTrackingAndReleaseFuture).isNotDone();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java:
##########
@@ -232,13 +226,12 @@ public void testSlotReleaseAtomicallyReleasesExecution() 
throws Exception {
         TestingPhysicalSlot physicalSlot = 
physicalSlotProvider.getFirstResponseOrFail().get();
         testMainThreadUtil.execute(
                 () -> {
-                    assertThat(
-                            execution.getAssignedAllocationID(),
-                            is(physicalSlot.getAllocationId()));
+                    assertThat(execution.getAssignedAllocationID())
+                            .isEqualTo(physicalSlot.getAllocationId());
 
                     physicalSlot.releasePayload(new FlinkException("Test 
exception"));
 
-                    assertThat(execution.getReleaseFuture().isDone(), 
is(true));
+                    assertThat(execution.getReleaseFuture().isDone()).isTrue();

Review Comment:
   ```suggestion
                       assertThat(execution.getReleaseFuture()).isDone();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java:
##########
@@ -45,69 +44,67 @@
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexResource;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for cancelling {@link ExecutionVertex ExecutionVertices}. */
-public class ExecutionVertexCancelTest extends TestLogger {
+class ExecutionVertexCancelTest {
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     // 
--------------------------------------------------------------------------------------------
     //  Canceling in different states
     // 
--------------------------------------------------------------------------------------------
 
     @Test
-    public void testCancelFromCreated() {
+    void testCancelFromCreated() {
         try {
             final ExecutionVertex vertex = getExecutionVertex();
 
-            assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CREATED);
 
             vertex.cancel();
 
-            assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
 
-            assertFalse(vertex.getFailureInfo().isPresent());
+            assertThat(vertex.getFailureInfo().isPresent()).isFalse();
 
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
+            assertThat(vertex.getStateTimestamp(ExecutionState.CREATED) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.CANCELING) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.CANCELED) > 
0).isTrue();

Review Comment:
   isGreatThan



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java:
##########
@@ -45,69 +44,67 @@
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexResource;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for cancelling {@link ExecutionVertex ExecutionVertices}. */
-public class ExecutionVertexCancelTest extends TestLogger {
+class ExecutionVertexCancelTest {
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     // 
--------------------------------------------------------------------------------------------
     //  Canceling in different states
     // 
--------------------------------------------------------------------------------------------
 
     @Test
-    public void testCancelFromCreated() {
+    void testCancelFromCreated() {
         try {
             final ExecutionVertex vertex = getExecutionVertex();
 
-            assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CREATED);
 
             vertex.cancel();
 
-            assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
 
-            assertFalse(vertex.getFailureInfo().isPresent());
+            assertThat(vertex.getFailureInfo().isPresent()).isFalse();
 
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
+            assertThat(vertex.getStateTimestamp(ExecutionState.CREATED) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.CANCELING) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.CANCELED) > 
0).isTrue();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
         }
     }
 
     @Test
-    public void testCancelFromScheduled() {
+    void testCancelFromScheduled() {
         try {
             final ExecutionVertex vertex = getExecutionVertex();
 
             setVertexState(vertex, ExecutionState.SCHEDULED);
-            assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.SCHEDULED);
 
             vertex.cancel();
 
-            assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
 
-            assertFalse(vertex.getFailureInfo().isPresent());
+            assertThat(vertex.getFailureInfo().isPresent()).isFalse();
 
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
+            assertThat(vertex.getStateTimestamp(ExecutionState.CREATED) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.CANCELING) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.CANCELED) > 
0).isTrue();

Review Comment:
   isGreatThan



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java:
##########
@@ -65,29 +59,29 @@ public void testDeployCall() {
                 // as expected
             }
 
-            assertFalse(vertex.getFailureInfo().isPresent());
+            assertThat(vertex.getFailureInfo().isPresent()).isFalse();
 
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
+            assertThat(vertex.getStateTimestamp(ExecutionState.CREATED) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 
0).isTrue();

Review Comment:
   isNotPresent and isGreatThan



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java:
##########
@@ -198,24 +195,24 @@ public void testCancelFromRunningDidNotFindTask() {
             setVertexResource(vertex, slot);
             setVertexState(vertex, ExecutionState.RUNNING);
 
-            assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.RUNNING);
 
             vertex.cancel();
 
-            assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CANCELING);
 
-            assertFalse(vertex.getFailureInfo().isPresent());
+            assertThat(vertex.getFailureInfo().isPresent()).isFalse();
 
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+            assertThat(vertex.getStateTimestamp(ExecutionState.CREATED) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.CANCELING) > 
0).isTrue();

Review Comment:
   isNotPresent and isGreatThan



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java:
##########
@@ -45,69 +44,67 @@
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexResource;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for cancelling {@link ExecutionVertex ExecutionVertices}. */
-public class ExecutionVertexCancelTest extends TestLogger {
+class ExecutionVertexCancelTest {
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     // 
--------------------------------------------------------------------------------------------
     //  Canceling in different states
     // 
--------------------------------------------------------------------------------------------
 
     @Test
-    public void testCancelFromCreated() {
+    void testCancelFromCreated() {
         try {
             final ExecutionVertex vertex = getExecutionVertex();
 
-            assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CREATED);
 
             vertex.cancel();
 
-            assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
 
-            assertFalse(vertex.getFailureInfo().isPresent());
+            assertThat(vertex.getFailureInfo().isPresent()).isFalse();
 
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
+            assertThat(vertex.getStateTimestamp(ExecutionState.CREATED) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.CANCELING) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.CANCELED) > 
0).isTrue();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
         }
     }
 
     @Test
-    public void testCancelFromScheduled() {
+    void testCancelFromScheduled() {
         try {
             final ExecutionVertex vertex = getExecutionVertex();
 
             setVertexState(vertex, ExecutionState.SCHEDULED);
-            assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.SCHEDULED);
 
             vertex.cancel();
 
-            assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
 
-            assertFalse(vertex.getFailureInfo().isPresent());
+            assertThat(vertex.getFailureInfo().isPresent()).isFalse();

Review Comment:
   ```suggestion
               assertThat(vertex.getFailureInfo()).isNotPresent();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java:
##########
@@ -228,25 +225,25 @@ public void testCancelCallFails() {
             setVertexResource(vertex, slot);
             setVertexState(vertex, ExecutionState.RUNNING);
 
-            assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.RUNNING);
 
             vertex.cancel();
 
             // Callback fails, leading to CANCELED
-            assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
 
-            assertFalse(slot.isAlive());
+            assertThat(slot.isAlive()).isFalse();
 
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+            assertThat(vertex.getStateTimestamp(ExecutionState.CREATED) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.CANCELING) > 
0).isTrue();

Review Comment:
   isGreatThan



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java:
##########
@@ -155,36 +152,36 @@ public void testRepeatedCancelFromRunning() {
             setVertexResource(vertex, slot);
             setVertexState(vertex, ExecutionState.RUNNING);
 
-            assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.RUNNING);
 
             vertex.cancel();
 
-            assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CANCELING);
 
             vertex.cancel();
 
-            assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CANCELING);
 
             // callback by TaskManager after canceling completes
             vertex.getCurrentExecutionAttempt().completeCancelling();
 
-            assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
 
-            assertFalse(slot.isAlive());
+            assertThat(slot.isAlive()).isFalse();
 
-            assertFalse(vertex.getFailureInfo().isPresent());
+            assertThat(vertex.getFailureInfo().isPresent()).isFalse();
 
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
+            assertThat(vertex.getStateTimestamp(ExecutionState.CREATED) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.CANCELING) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.CANCELED) > 
0).isTrue();

Review Comment:
   isNotPresent and isGreatThan



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java:
##########
@@ -97,25 +91,25 @@ public void testDeployWithSynchronousAnswer() {
                 // as expected
             }
 
-            assertFalse(vertex.getFailureInfo().isPresent());
+            assertThat(vertex.getFailureInfo().isPresent()).isFalse();
 
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) == 0);
+            assertThat(vertex.getStateTimestamp(ExecutionState.CREATED) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.RUNNING) == 
0).isTrue();

Review Comment:
   isNotPresent and isGreatThan and isEqualTo



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java:
##########
@@ -158,28 +152,27 @@ public void testDeployFailedSynchronous() {
                                     new 
SubmitFailingSimpleAckingTaskManagerGateway())
                             .createTestingLogicalSlot();
 
-            assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CREATED);
             
vertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
 
             vertex.deployToSlot(slot);
 
-            assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
-            assertTrue(vertex.getFailureInfo().isPresent());
-            assertThat(
-                    
vertex.getFailureInfo().map(ErrorInfo::getExceptionAsString).get(),
-                    containsString(ERROR_MESSAGE));
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.FAILED);
+            assertThat(vertex.getFailureInfo().isPresent()).isTrue();
+            
assertThat(vertex.getFailureInfo().map(ErrorInfo::getExceptionAsString).get())
+                    .contains(ERROR_MESSAGE);
 
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
+            assertThat(vertex.getStateTimestamp(ExecutionState.CREATED) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.FAILED) > 
0).isTrue();

Review Comment:
    isGreatThan



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java:
##########
@@ -138,17 +132,17 @@ public void testDeployWithAsynchronousAnswer() {
                 // as expected
             }
 
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) == 0);
+            assertThat(vertex.getStateTimestamp(ExecutionState.CREATED) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.RUNNING) == 
0).isTrue();

Review Comment:
   Same the last one



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java:
##########
@@ -230,25 +222,25 @@ public void testFailExternallyDuringDeploy() {
                                     new 
SubmitBlockingSimpleAckingTaskManagerGateway())
                             .createTestingLogicalSlot();
 
-            assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CREATED);
             
vertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
             vertex.deployToSlot(testingLogicalSlot);
-            assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.DEPLOYING);
 
             Exception testError = new Exception("test error");
             vertex.fail(testError);
 
-            assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.FAILED);
             assertThat(
-                    vertex.getFailureInfo()
-                            .map(ErrorInfo::getException)
-                            .get()
-                            
.deserializeError(ClassLoader.getSystemClassLoader()),
-                    is(testError));
-
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
+                            vertex.getFailureInfo()
+                                    .map(ErrorInfo::getException)
+                                    .get()
+                                    
.deserializeError(ClassLoader.getSystemClassLoader()))
+                    .isEqualTo(testError);
+
+            assertThat(vertex.getStateTimestamp(ExecutionState.CREATED) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.FAILED) > 
0).isTrue();

Review Comment:
    isGreatThan



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java:
##########
@@ -90,11 +88,11 @@ public void testResetForNewExecutionReleasesPartitions() 
throws Exception {
         Execution execution =
                 
producerExecutionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
 
-        assertFalse(releasePartitionsFuture.isDone());
+        assertThat(releasePartitionsFuture.isDone()).isFalse();
 
         execution.markFinished();
 
-        assertFalse(releasePartitionsFuture.isDone());
+        assertThat(releasePartitionsFuture.isDone()).isFalse();

Review Comment:
   isNotDone



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java:
##########
@@ -204,23 +197,22 @@ public void testDeployFailedAsynchronously() {
                 }
             }
 
-            assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
-            assertTrue(vertex.getFailureInfo().isPresent());
-            assertThat(
-                    
vertex.getFailureInfo().map(ErrorInfo::getExceptionAsString).get(),
-                    containsString(ERROR_MESSAGE));
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.FAILED);
+            assertThat(vertex.getFailureInfo().isPresent()).isTrue();
+            
assertThat(vertex.getFailureInfo().map(ErrorInfo::getExceptionAsString).get())
+                    .contains(ERROR_MESSAGE);
 
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
+            assertThat(vertex.getStateTimestamp(ExecutionState.CREATED) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.FAILED) > 
0).isTrue();

Review Comment:
   isPresent and isGreatThan



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java:
##########
@@ -120,29 +117,29 @@ public void testCancelFromRunning() {
             setVertexResource(vertex, slot);
             setVertexState(vertex, ExecutionState.RUNNING);
 
-            assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.RUNNING);
 
             vertex.cancel();
             vertex.getCurrentExecutionAttempt()
                     .completeCancelling(); // response by task manager once 
actually canceled
 
-            assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
 
-            assertFalse(slot.isAlive());
+            assertThat(slot.isAlive()).isFalse();
 
-            assertFalse(vertex.getFailureInfo().isPresent());
+            assertThat(vertex.getFailureInfo().isPresent()).isFalse();
 
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
-            assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
+            assertThat(vertex.getStateTimestamp(ExecutionState.CREATED) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.CANCELING) > 
0).isTrue();
+            assertThat(vertex.getStateTimestamp(ExecutionState.CANCELED) > 
0).isTrue();

Review Comment:
   isGreatThan



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java:
##########
@@ -120,29 +117,29 @@ public void testCancelFromRunning() {
             setVertexResource(vertex, slot);
             setVertexState(vertex, ExecutionState.RUNNING);
 
-            assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.RUNNING);
 
             vertex.cancel();
             vertex.getCurrentExecutionAttempt()
                     .completeCancelling(); // response by task manager once 
actually canceled
 
-            assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
 
-            assertFalse(slot.isAlive());
+            assertThat(slot.isAlive()).isFalse();
 
-            assertFalse(vertex.getFailureInfo().isPresent());
+            assertThat(vertex.getFailureInfo().isPresent()).isFalse();

Review Comment:
   ```suggestion
               assertThat(vertex.getFailureInfo()).isNotPresent();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to