This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 247d4263c1a484ba12ce5c7826ff938b83285d11
Author: Gen Luo <luogen...@gmail.com>
AuthorDate: Tue Jul 26 15:56:13 2022 +0800

    [hotfix][runtime][tests] Migrates some tests to Junit5
---
 .../executiongraph/ArchivedExecutionGraphTest.java | 356 +++++----------------
 .../ArchivedExecutionGraphTestUtils.java           | 170 ++++++++++
 .../messages/webmonitor/JobDetailsTest.java        |  15 +-
 .../metrics/dump/MetricDumpSerializerTest.java     | 135 ++++----
 .../runtime/metrics/dump/QueryScopeInfoTest.java   | 168 +++++-----
 .../job/JobVertexBackPressureHandlerTest.java      |  67 ++--
 .../handler/legacy/metrics/MetricStoreTest.java    |  50 +--
 .../taskmanager/TaskManagerDetailsHandlerTest.java |  18 +-
 8 files changed, 475 insertions(+), 504 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index b6e9cefc2b4..4427c5b3042 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import 
org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummarySnapshot;
@@ -42,44 +41,34 @@ import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
-import org.apache.flink.util.OptionalFailure;
-import org.apache.flink.util.SerializedValue;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Function;
 
 import static java.util.Arrays.asList;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link ArchivedExecutionGraph}. */
-public class ArchivedExecutionGraphTest extends TestLogger {
+public class ArchivedExecutionGraphTest {
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     private static ExecutionGraph runtimeGraph;
 
-    @BeforeClass
-    public static void setupExecutionGraph() throws Exception {
+    @BeforeAll
+    static void setupExecutionGraph() throws Exception {
         // 
-------------------------------------------------------------------------------------------------------------
         // Setup
         // 
-------------------------------------------------------------------------------------------------------------
@@ -146,21 +135,21 @@ public class ArchivedExecutionGraphTest extends 
TestLogger {
     }
 
     @Test
-    public void testArchive() throws IOException, ClassNotFoundException {
+    void testArchive() throws IOException, ClassNotFoundException {
         ArchivedExecutionGraph archivedGraph = 
ArchivedExecutionGraph.createFrom(runtimeGraph);
 
         compareExecutionGraph(runtimeGraph, archivedGraph);
     }
 
     @Test
-    public void testSerialization() throws IOException, ClassNotFoundException 
{
+    void testSerialization() throws IOException, ClassNotFoundException {
         ArchivedExecutionGraph archivedGraph = 
ArchivedExecutionGraph.createFrom(runtimeGraph);
 
         verifySerializability(archivedGraph);
     }
 
     @Test
-    public void testCreateFromInitializingJobForSuspendedJob() {
+    void testCreateFromInitializingJobForSuspendedJob() {
         final ArchivedExecutionGraph suspendedExecutionGraph =
                 ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
                         new JobID(),
@@ -170,12 +159,12 @@ public class ArchivedExecutionGraphTest extends 
TestLogger {
                         null,
                         System.currentTimeMillis());
 
-        assertThat(suspendedExecutionGraph.getState(), 
is(JobStatus.SUSPENDED));
-        assertThat(suspendedExecutionGraph.getFailureInfo(), notNullValue());
+        
assertThat(suspendedExecutionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
+        assertThat(suspendedExecutionGraph.getFailureInfo()).isNotNull();
     }
 
     @Test
-    public void testCheckpointSettingsArchiving() {
+    void testCheckpointSettingsArchiving() {
         final CheckpointCoordinatorConfiguration 
checkpointCoordinatorConfiguration =
                 CheckpointCoordinatorConfiguration.builder().build();
 
@@ -192,19 +181,19 @@ public class ArchivedExecutionGraphTest extends 
TestLogger {
     }
 
     public static void assertContainsCheckpointSettings(ArchivedExecutionGraph 
archivedGraph) {
-        assertThat(archivedGraph.getCheckpointCoordinatorConfiguration(), 
notNullValue());
-        assertThat(archivedGraph.getCheckpointStatsSnapshot(), notNullValue());
-        assertThat(archivedGraph.getCheckpointStorageName().get(), 
is("Unknown"));
-        assertThat(archivedGraph.getStateBackendName().get(), is("Unknown"));
+        
assertThat(archivedGraph.getCheckpointCoordinatorConfiguration()).isNotNull();
+        assertThat(archivedGraph.getCheckpointStatsSnapshot()).isNotNull();
+        
assertThat(archivedGraph.getCheckpointStorageName().get()).isEqualTo("Unknown");
+        
assertThat(archivedGraph.getStateBackendName().get()).isEqualTo("Unknown");
     }
 
     @Test
-    public void testArchiveWithStatusOverride() throws IOException, 
ClassNotFoundException {
+    void testArchiveWithStatusOverride() throws IOException, 
ClassNotFoundException {
         ArchivedExecutionGraph archivedGraph =
                 ArchivedExecutionGraph.createFrom(runtimeGraph, 
JobStatus.RESTARTING);
 
-        assertThat(archivedGraph.getState(), is(JobStatus.RESTARTING));
-        assertThat(archivedGraph.getStatusTimestamp(JobStatus.FAILED), is(0L));
+        assertThat(archivedGraph.getState()).isEqualTo(JobStatus.RESTARTING);
+        
assertThat(archivedGraph.getStatusTimestamp(JobStatus.FAILED)).isEqualTo(0L);
     }
 
     private static void compareExecutionGraph(
@@ -213,41 +202,31 @@ public class ArchivedExecutionGraphTest extends 
TestLogger {
         // 
-------------------------------------------------------------------------------------------------------------
         // ExecutionGraph
         // 
-------------------------------------------------------------------------------------------------------------
-        assertEquals(runtimeGraph.getJsonPlan(), archivedGraph.getJsonPlan());
-        assertEquals(runtimeGraph.getJobID(), archivedGraph.getJobID());
-        assertEquals(runtimeGraph.getJobName(), archivedGraph.getJobName());
-        assertEquals(runtimeGraph.getState(), archivedGraph.getState());
-        assertEquals(
-                runtimeGraph.getFailureInfo().getExceptionAsString(),
-                archivedGraph.getFailureInfo().getExceptionAsString());
-        assertEquals(
-                runtimeGraph.getStatusTimestamp(JobStatus.CREATED),
-                archivedGraph.getStatusTimestamp(JobStatus.CREATED));
-        assertEquals(
-                runtimeGraph.getStatusTimestamp(JobStatus.RUNNING),
-                archivedGraph.getStatusTimestamp(JobStatus.RUNNING));
-        assertEquals(
-                runtimeGraph.getStatusTimestamp(JobStatus.FAILING),
-                archivedGraph.getStatusTimestamp(JobStatus.FAILING));
-        assertEquals(
-                runtimeGraph.getStatusTimestamp(JobStatus.FAILED),
-                archivedGraph.getStatusTimestamp(JobStatus.FAILED));
-        assertEquals(
-                runtimeGraph.getStatusTimestamp(JobStatus.CANCELLING),
-                archivedGraph.getStatusTimestamp(JobStatus.CANCELLING));
-        assertEquals(
-                runtimeGraph.getStatusTimestamp(JobStatus.CANCELED),
-                archivedGraph.getStatusTimestamp(JobStatus.CANCELED));
-        assertEquals(
-                runtimeGraph.getStatusTimestamp(JobStatus.FINISHED),
-                archivedGraph.getStatusTimestamp(JobStatus.FINISHED));
-        assertEquals(
-                runtimeGraph.getStatusTimestamp(JobStatus.RESTARTING),
-                archivedGraph.getStatusTimestamp(JobStatus.RESTARTING));
-        assertEquals(
-                runtimeGraph.getStatusTimestamp(JobStatus.SUSPENDED),
-                archivedGraph.getStatusTimestamp(JobStatus.SUSPENDED));
-        assertEquals(runtimeGraph.isStoppable(), archivedGraph.isStoppable());
+        
assertThat(runtimeGraph.getJsonPlan()).isEqualTo(archivedGraph.getJsonPlan());
+        
assertThat(runtimeGraph.getJobID()).isEqualTo(archivedGraph.getJobID());
+        
assertThat(runtimeGraph.getJobName()).isEqualTo(archivedGraph.getJobName());
+        
assertThat(runtimeGraph.getState()).isEqualTo(archivedGraph.getState());
+        assertThat(runtimeGraph.getFailureInfo().getExceptionAsString())
+                
.isEqualTo(archivedGraph.getFailureInfo().getExceptionAsString());
+        assertThat(runtimeGraph.getStatusTimestamp(JobStatus.CREATED))
+                
.isEqualTo(archivedGraph.getStatusTimestamp(JobStatus.CREATED));
+        assertThat(runtimeGraph.getStatusTimestamp(JobStatus.RUNNING))
+                
.isEqualTo(archivedGraph.getStatusTimestamp(JobStatus.RUNNING));
+        assertThat(runtimeGraph.getStatusTimestamp(JobStatus.FAILING))
+                
.isEqualTo(archivedGraph.getStatusTimestamp(JobStatus.FAILING));
+        assertThat(runtimeGraph.getStatusTimestamp(JobStatus.FAILED))
+                .isEqualTo(archivedGraph.getStatusTimestamp(JobStatus.FAILED));
+        assertThat(runtimeGraph.getStatusTimestamp(JobStatus.CANCELLING))
+                
.isEqualTo(archivedGraph.getStatusTimestamp(JobStatus.CANCELLING));
+        assertThat(runtimeGraph.getStatusTimestamp(JobStatus.CANCELED))
+                
.isEqualTo(archivedGraph.getStatusTimestamp(JobStatus.CANCELED));
+        assertThat(runtimeGraph.getStatusTimestamp(JobStatus.FINISHED))
+                
.isEqualTo(archivedGraph.getStatusTimestamp(JobStatus.FINISHED));
+        assertThat(runtimeGraph.getStatusTimestamp(JobStatus.RESTARTING))
+                
.isEqualTo(archivedGraph.getStatusTimestamp(JobStatus.RESTARTING));
+        assertThat(runtimeGraph.getStatusTimestamp(JobStatus.SUSPENDED))
+                
.isEqualTo(archivedGraph.getStatusTimestamp(JobStatus.SUSPENDED));
+        
assertThat(runtimeGraph.isStoppable()).isEqualTo(archivedGraph.isStoppable());
 
         // 
-------------------------------------------------------------------------------------------------------------
         // CheckpointStats
@@ -279,19 +258,16 @@ public class ArchivedExecutionGraphTest extends 
TestLogger {
             StatsSummarySnapshot runtime = 
meter.apply(runtimeSnapshot.getSummaryStats());
             StatsSummarySnapshot archived = 
meter.apply(runtimeSnapshot.getSummaryStats());
             for (Function<StatsSummarySnapshot, Object> agg : aggs) {
-                assertEquals(agg.apply(runtime), agg.apply(archived));
+                assertThat(agg.apply(runtime)).isEqualTo(agg.apply(archived));
             }
         }
 
-        assertEquals(
-                runtimeSnapshot.getCounts().getTotalNumberOfCheckpoints(),
-                archivedSnapshot.getCounts().getTotalNumberOfCheckpoints());
-        assertEquals(
-                runtimeSnapshot.getCounts().getNumberOfCompletedCheckpoints(),
-                
archivedSnapshot.getCounts().getNumberOfCompletedCheckpoints());
-        assertEquals(
-                runtimeSnapshot.getCounts().getNumberOfInProgressCheckpoints(),
-                
archivedSnapshot.getCounts().getNumberOfInProgressCheckpoints());
+        assertThat(runtimeSnapshot.getCounts().getTotalNumberOfCheckpoints())
+                
.isEqualTo(archivedSnapshot.getCounts().getTotalNumberOfCheckpoints());
+        
assertThat(runtimeSnapshot.getCounts().getNumberOfCompletedCheckpoints())
+                
.isEqualTo(archivedSnapshot.getCounts().getNumberOfCompletedCheckpoints());
+        
assertThat(runtimeSnapshot.getCounts().getNumberOfInProgressCheckpoints())
+                
.isEqualTo(archivedSnapshot.getCounts().getNumberOfInProgressCheckpoints());
 
         // 
-------------------------------------------------------------------------------------------------------------
         // ArchivedExecutionConfig
@@ -299,24 +275,23 @@ public class ArchivedExecutionGraphTest extends 
TestLogger {
         ArchivedExecutionConfig runtimeConfig = 
runtimeGraph.getArchivedExecutionConfig();
         ArchivedExecutionConfig archivedConfig = 
archivedGraph.getArchivedExecutionConfig();
 
-        assertEquals(runtimeConfig.getExecutionMode(), 
archivedConfig.getExecutionMode());
-        assertEquals(runtimeConfig.getParallelism(), 
archivedConfig.getParallelism());
-        assertEquals(runtimeConfig.getObjectReuseEnabled(), 
archivedConfig.getObjectReuseEnabled());
-        assertEquals(
-                runtimeConfig.getRestartStrategyDescription(),
-                archivedConfig.getRestartStrategyDescription());
-        assertNotNull(archivedConfig.getGlobalJobParameters().get("hello"));
-        assertEquals(
-                runtimeConfig.getGlobalJobParameters().get("hello"),
-                archivedConfig.getGlobalJobParameters().get("hello"));
+        
assertThat(runtimeConfig.getExecutionMode()).isEqualTo(archivedConfig.getExecutionMode());
+        
assertThat(runtimeConfig.getParallelism()).isEqualTo(archivedConfig.getParallelism());
+        assertThat(runtimeConfig.getObjectReuseEnabled())
+                .isEqualTo(archivedConfig.getObjectReuseEnabled());
+        assertThat(runtimeConfig.getRestartStrategyDescription())
+                .isEqualTo(archivedConfig.getRestartStrategyDescription());
+        
assertThat(archivedConfig.getGlobalJobParameters().get("hello")).isNotNull();
+        assertThat(runtimeConfig.getGlobalJobParameters().get("hello"))
+                
.isEqualTo(archivedConfig.getGlobalJobParameters().get("hello"));
 
         // 
-------------------------------------------------------------------------------------------------------------
         // StringifiedAccumulators
         // 
-------------------------------------------------------------------------------------------------------------
-        compareStringifiedAccumulators(
+        ArchivedExecutionGraphTestUtils.compareStringifiedAccumulators(
                 runtimeGraph.getAccumulatorResultsStringified(),
                 archivedGraph.getAccumulatorResultsStringified());
-        compareSerializedAccumulators(
+        ArchivedExecutionGraphTestUtils.compareSerializedAccumulators(
                 runtimeGraph.getAccumulatorsSerialized(),
                 archivedGraph.getAccumulatorsSerialized());
 
@@ -339,7 +314,7 @@ public class ArchivedExecutionGraphTest extends TestLogger {
                 archivedGraph.getVerticesTopologically().iterator();
 
         while (runtimeTopologicalVertices.hasNext()) {
-            assertTrue(archiveTopologicaldVertices.hasNext());
+            assertThat(archiveTopologicaldVertices.hasNext()).isTrue();
             compareExecutionJobVertex(
                     runtimeTopologicalVertices.next(), 
archiveTopologicaldVertices.next());
         }
@@ -353,199 +328,32 @@ public class ArchivedExecutionGraphTest extends 
TestLogger {
                 archivedGraph.getAllExecutionVertices().iterator();
 
         while (runtimeExecutionVertices.hasNext()) {
-            assertTrue(archivedExecutionVertices.hasNext());
-            compareExecutionVertex(
+            assertThat(archivedExecutionVertices.hasNext()).isTrue();
+            ArchivedExecutionGraphTestUtils.compareExecutionVertex(
                     runtimeExecutionVertices.next(), 
archivedExecutionVertices.next());
         }
     }
 
     private static void compareExecutionJobVertex(
             AccessExecutionJobVertex runtimeJobVertex, 
AccessExecutionJobVertex archivedJobVertex) {
-        assertEquals(runtimeJobVertex.getName(), archivedJobVertex.getName());
-        assertEquals(runtimeJobVertex.getParallelism(), 
archivedJobVertex.getParallelism());
-        assertEquals(runtimeJobVertex.getMaxParallelism(), 
archivedJobVertex.getMaxParallelism());
-        assertEquals(runtimeJobVertex.getJobVertexId(), 
archivedJobVertex.getJobVertexId());
-        assertEquals(runtimeJobVertex.getAggregateState(), 
archivedJobVertex.getAggregateState());
-
-        compareStringifiedAccumulators(
+        
assertThat(runtimeJobVertex.getName()).isEqualTo(archivedJobVertex.getName());
+        
assertThat(runtimeJobVertex.getParallelism()).isEqualTo(archivedJobVertex.getParallelism());
+        assertThat(runtimeJobVertex.getMaxParallelism())
+                .isEqualTo(archivedJobVertex.getMaxParallelism());
+        
assertThat(runtimeJobVertex.getJobVertexId()).isEqualTo(archivedJobVertex.getJobVertexId());
+        assertThat(runtimeJobVertex.getAggregateState())
+                .isEqualTo(archivedJobVertex.getAggregateState());
+
+        ArchivedExecutionGraphTestUtils.compareStringifiedAccumulators(
                 runtimeJobVertex.getAggregatedUserAccumulatorsStringified(),
                 archivedJobVertex.getAggregatedUserAccumulatorsStringified());
 
         AccessExecutionVertex[] runtimeExecutionVertices = 
runtimeJobVertex.getTaskVertices();
         AccessExecutionVertex[] archivedExecutionVertices = 
archivedJobVertex.getTaskVertices();
-        assertEquals(runtimeExecutionVertices.length, 
archivedExecutionVertices.length);
+        
assertThat(runtimeExecutionVertices.length).isEqualTo(archivedExecutionVertices.length);
         for (int x = 0; x < runtimeExecutionVertices.length; x++) {
-            compareExecutionVertex(runtimeExecutionVertices[x], 
archivedExecutionVertices[x]);
-        }
-    }
-
-    private static void compareExecutionVertex(
-            AccessExecutionVertex runtimeVertex, AccessExecutionVertex 
archivedVertex) {
-        assertEquals(
-                runtimeVertex.getTaskNameWithSubtaskIndex(),
-                archivedVertex.getTaskNameWithSubtaskIndex());
-        assertEquals(
-                runtimeVertex.getParallelSubtaskIndex(), 
archivedVertex.getParallelSubtaskIndex());
-        assertEquals(runtimeVertex.getExecutionState(), 
archivedVertex.getExecutionState());
-        assertEquals(
-                runtimeVertex.getStateTimestamp(ExecutionState.CREATED),
-                archivedVertex.getStateTimestamp(ExecutionState.CREATED));
-        assertEquals(
-                runtimeVertex.getStateTimestamp(ExecutionState.SCHEDULED),
-                archivedVertex.getStateTimestamp(ExecutionState.SCHEDULED));
-        assertEquals(
-                runtimeVertex.getStateTimestamp(ExecutionState.DEPLOYING),
-                archivedVertex.getStateTimestamp(ExecutionState.DEPLOYING));
-        assertEquals(
-                runtimeVertex.getStateTimestamp(ExecutionState.INITIALIZING),
-                archivedVertex.getStateTimestamp(ExecutionState.INITIALIZING));
-        assertEquals(
-                runtimeVertex.getStateTimestamp(ExecutionState.RUNNING),
-                archivedVertex.getStateTimestamp(ExecutionState.RUNNING));
-        assertEquals(
-                runtimeVertex.getStateTimestamp(ExecutionState.FINISHED),
-                archivedVertex.getStateTimestamp(ExecutionState.FINISHED));
-        assertEquals(
-                runtimeVertex.getStateTimestamp(ExecutionState.CANCELING),
-                archivedVertex.getStateTimestamp(ExecutionState.CANCELING));
-        assertEquals(
-                runtimeVertex.getStateTimestamp(ExecutionState.CANCELED),
-                archivedVertex.getStateTimestamp(ExecutionState.CANCELED));
-        assertEquals(
-                runtimeVertex.getStateTimestamp(ExecutionState.FAILED),
-                archivedVertex.getStateTimestamp(ExecutionState.FAILED));
-        assertThat(
-                
runtimeVertex.getFailureInfo().map(ErrorInfo::getExceptionAsString),
-                
is(archivedVertex.getFailureInfo().map(ErrorInfo::getExceptionAsString)));
-        assertThat(
-                runtimeVertex.getFailureInfo().map(ErrorInfo::getTimestamp),
-                
is(archivedVertex.getFailureInfo().map(ErrorInfo::getTimestamp)));
-        assertEquals(
-                runtimeVertex.getCurrentAssignedResourceLocation(),
-                archivedVertex.getCurrentAssignedResourceLocation());
-
-        compareExecution(
-                runtimeVertex.getCurrentExecutionAttempt(),
-                archivedVertex.getCurrentExecutionAttempt());
-    }
-
-    private static void compareExecution(
-            AccessExecution runtimeExecution, AccessExecution 
archivedExecution) {
-        assertEquals(runtimeExecution.getAttemptId(), 
archivedExecution.getAttemptId());
-        assertEquals(runtimeExecution.getAttemptNumber(), 
archivedExecution.getAttemptNumber());
-        assertArrayEquals(
-                runtimeExecution.getStateTimestamps(), 
archivedExecution.getStateTimestamps());
-        assertArrayEquals(
-                runtimeExecution.getStateEndTimestamps(),
-                archivedExecution.getStateEndTimestamps());
-        assertEquals(runtimeExecution.getState(), 
archivedExecution.getState());
-        assertEquals(
-                runtimeExecution.getAssignedResourceLocation(),
-                archivedExecution.getAssignedResourceLocation());
-        assertThat(
-                
runtimeExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString),
-                
is(archivedExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString)));
-        assertThat(
-                runtimeExecution.getFailureInfo().map(ErrorInfo::getTimestamp),
-                
is(archivedExecution.getFailureInfo().map(ErrorInfo::getTimestamp)));
-        assertEquals(
-                runtimeExecution.getStateTimestamp(ExecutionState.CREATED),
-                archivedExecution.getStateTimestamp(ExecutionState.CREATED));
-        assertEquals(
-                runtimeExecution.getStateTimestamp(ExecutionState.SCHEDULED),
-                archivedExecution.getStateTimestamp(ExecutionState.SCHEDULED));
-        assertEquals(
-                runtimeExecution.getStateTimestamp(ExecutionState.DEPLOYING),
-                archivedExecution.getStateTimestamp(ExecutionState.DEPLOYING));
-        assertEquals(
-                
runtimeExecution.getStateTimestamp(ExecutionState.INITIALIZING),
-                
archivedExecution.getStateTimestamp(ExecutionState.INITIALIZING));
-        assertEquals(
-                runtimeExecution.getStateTimestamp(ExecutionState.RUNNING),
-                archivedExecution.getStateTimestamp(ExecutionState.RUNNING));
-        assertEquals(
-                runtimeExecution.getStateTimestamp(ExecutionState.FINISHED),
-                archivedExecution.getStateTimestamp(ExecutionState.FINISHED));
-        assertEquals(
-                runtimeExecution.getStateTimestamp(ExecutionState.CANCELING),
-                archivedExecution.getStateTimestamp(ExecutionState.CANCELING));
-        assertEquals(
-                runtimeExecution.getStateTimestamp(ExecutionState.CANCELED),
-                archivedExecution.getStateTimestamp(ExecutionState.CANCELED));
-        assertEquals(
-                runtimeExecution.getStateTimestamp(ExecutionState.FAILED),
-                archivedExecution.getStateTimestamp(ExecutionState.FAILED));
-        assertEquals(
-                runtimeExecution.getStateEndTimestamp(ExecutionState.CREATED),
-                
archivedExecution.getStateEndTimestamp(ExecutionState.CREATED));
-        assertEquals(
-                
runtimeExecution.getStateEndTimestamp(ExecutionState.SCHEDULED),
-                
archivedExecution.getStateEndTimestamp(ExecutionState.SCHEDULED));
-        assertEquals(
-                
runtimeExecution.getStateEndTimestamp(ExecutionState.DEPLOYING),
-                
archivedExecution.getStateEndTimestamp(ExecutionState.DEPLOYING));
-        assertEquals(
-                
runtimeExecution.getStateEndTimestamp(ExecutionState.INITIALIZING),
-                
archivedExecution.getStateEndTimestamp(ExecutionState.INITIALIZING));
-        assertEquals(
-                runtimeExecution.getStateEndTimestamp(ExecutionState.RUNNING),
-                
archivedExecution.getStateEndTimestamp(ExecutionState.RUNNING));
-        assertEquals(
-                runtimeExecution.getStateEndTimestamp(ExecutionState.FINISHED),
-                
archivedExecution.getStateEndTimestamp(ExecutionState.FINISHED));
-        assertEquals(
-                
runtimeExecution.getStateEndTimestamp(ExecutionState.CANCELING),
-                
archivedExecution.getStateEndTimestamp(ExecutionState.CANCELING));
-        assertEquals(
-                runtimeExecution.getStateEndTimestamp(ExecutionState.CANCELED),
-                
archivedExecution.getStateEndTimestamp(ExecutionState.CANCELED));
-        assertEquals(
-                runtimeExecution.getStateEndTimestamp(ExecutionState.FAILED),
-                archivedExecution.getStateEndTimestamp(ExecutionState.FAILED));
-        compareStringifiedAccumulators(
-                runtimeExecution.getUserAccumulatorsStringified(),
-                archivedExecution.getUserAccumulatorsStringified());
-        assertEquals(
-                runtimeExecution.getParallelSubtaskIndex(),
-                archivedExecution.getParallelSubtaskIndex());
-    }
-
-    private static void compareStringifiedAccumulators(
-            StringifiedAccumulatorResult[] runtimeAccs,
-            StringifiedAccumulatorResult[] archivedAccs) {
-        assertEquals(runtimeAccs.length, archivedAccs.length);
-
-        for (int x = 0; x < runtimeAccs.length; x++) {
-            StringifiedAccumulatorResult runtimeResult = runtimeAccs[x];
-            StringifiedAccumulatorResult archivedResult = archivedAccs[x];
-
-            assertEquals(runtimeResult.getName(), archivedResult.getName());
-            assertEquals(runtimeResult.getType(), archivedResult.getType());
-            assertEquals(runtimeResult.getValue(), archivedResult.getValue());
-        }
-    }
-
-    private static void compareSerializedAccumulators(
-            Map<String, SerializedValue<OptionalFailure<Object>>> runtimeAccs,
-            Map<String, SerializedValue<OptionalFailure<Object>>> archivedAccs)
-            throws IOException, ClassNotFoundException {
-        assertEquals(runtimeAccs.size(), archivedAccs.size());
-        for (Entry<String, SerializedValue<OptionalFailure<Object>>> 
runtimeAcc :
-                runtimeAccs.entrySet()) {
-            long runtimeUserAcc =
-                    (long)
-                            runtimeAcc
-                                    .getValue()
-                                    
.deserializeValue(ClassLoader.getSystemClassLoader())
-                                    .getUnchecked();
-            long archivedUserAcc =
-                    (long)
-                            archivedAccs
-                                    .get(runtimeAcc.getKey())
-                                    
.deserializeValue(ClassLoader.getSystemClassLoader())
-                                    .getUnchecked();
-
-            assertEquals(runtimeUserAcc, archivedUserAcc);
+            ArchivedExecutionGraphTestUtils.compareExecutionVertex(
+                    runtimeExecutionVertices[x], archivedExecutionVertices[x]);
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java
new file mode 100644
index 00000000000..55a10cd5396
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class ArchivedExecutionGraphTestUtils {
+
+    private ArchivedExecutionGraphTestUtils() {}
+
+    static void compareExecutionVertex(
+            AccessExecutionVertex runtimeVertex, AccessExecutionVertex 
archivedVertex) {
+        assertThat(runtimeVertex.getTaskNameWithSubtaskIndex())
+                .isEqualTo(archivedVertex.getTaskNameWithSubtaskIndex());
+        assertThat(runtimeVertex.getParallelSubtaskIndex())
+                .isEqualTo(archivedVertex.getParallelSubtaskIndex());
+        
assertThat(runtimeVertex.getExecutionState()).isEqualTo(archivedVertex.getExecutionState());
+        assertThat(runtimeVertex.getStateTimestamp(ExecutionState.CREATED))
+                
.isEqualTo(archivedVertex.getStateTimestamp(ExecutionState.CREATED));
+        assertThat(runtimeVertex.getStateTimestamp(ExecutionState.SCHEDULED))
+                
.isEqualTo(archivedVertex.getStateTimestamp(ExecutionState.SCHEDULED));
+        assertThat(runtimeVertex.getStateTimestamp(ExecutionState.DEPLOYING))
+                
.isEqualTo(archivedVertex.getStateTimestamp(ExecutionState.DEPLOYING));
+        
assertThat(runtimeVertex.getStateTimestamp(ExecutionState.INITIALIZING))
+                
.isEqualTo(archivedVertex.getStateTimestamp(ExecutionState.INITIALIZING));
+        assertThat(runtimeVertex.getStateTimestamp(ExecutionState.RUNNING))
+                
.isEqualTo(archivedVertex.getStateTimestamp(ExecutionState.RUNNING));
+        assertThat(runtimeVertex.getStateTimestamp(ExecutionState.FINISHED))
+                
.isEqualTo(archivedVertex.getStateTimestamp(ExecutionState.FINISHED));
+        assertThat(runtimeVertex.getStateTimestamp(ExecutionState.CANCELING))
+                
.isEqualTo(archivedVertex.getStateTimestamp(ExecutionState.CANCELING));
+        assertThat(runtimeVertex.getStateTimestamp(ExecutionState.CANCELED))
+                
.isEqualTo(archivedVertex.getStateTimestamp(ExecutionState.CANCELED));
+        assertThat(runtimeVertex.getStateTimestamp(ExecutionState.FAILED))
+                
.isEqualTo(archivedVertex.getStateTimestamp(ExecutionState.FAILED));
+        
assertThat(runtimeVertex.getFailureInfo().map(ErrorInfo::getExceptionAsString))
+                
.isEqualTo(archivedVertex.getFailureInfo().map(ErrorInfo::getExceptionAsString));
+        assertThat(runtimeVertex.getFailureInfo().map(ErrorInfo::getTimestamp))
+                
.isEqualTo(archivedVertex.getFailureInfo().map(ErrorInfo::getTimestamp));
+        assertThat(runtimeVertex.getCurrentAssignedResourceLocation())
+                
.isEqualTo(archivedVertex.getCurrentAssignedResourceLocation());
+
+        compareExecution(
+                runtimeVertex.getCurrentExecutionAttempt(),
+                archivedVertex.getCurrentExecutionAttempt());
+    }
+
+    private static void compareExecution(
+            AccessExecution runtimeExecution, AccessExecution 
archivedExecution) {
+        
assertThat(runtimeExecution.getAttemptId()).isEqualTo(archivedExecution.getAttemptId());
+        assertThat(runtimeExecution.getAttemptNumber())
+                .isEqualTo(archivedExecution.getAttemptNumber());
+        assertThat(runtimeExecution.getStateTimestamps())
+                .containsExactly(archivedExecution.getStateTimestamps());
+        assertThat(runtimeExecution.getStateEndTimestamps())
+                .containsExactly(archivedExecution.getStateEndTimestamps());
+        
assertThat(runtimeExecution.getState()).isEqualTo(archivedExecution.getState());
+        assertThat(runtimeExecution.getAssignedResourceLocation())
+                .isEqualTo(archivedExecution.getAssignedResourceLocation());
+        
assertThat(runtimeExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString))
+                
.isEqualTo(archivedExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString));
+        
assertThat(runtimeExecution.getFailureInfo().map(ErrorInfo::getTimestamp))
+                
.isEqualTo(archivedExecution.getFailureInfo().map(ErrorInfo::getTimestamp));
+        assertThat(runtimeExecution.getStateTimestamp(ExecutionState.CREATED))
+                
.isEqualTo(archivedExecution.getStateTimestamp(ExecutionState.CREATED));
+        
assertThat(runtimeExecution.getStateTimestamp(ExecutionState.SCHEDULED))
+                
.isEqualTo(archivedExecution.getStateTimestamp(ExecutionState.SCHEDULED));
+        
assertThat(runtimeExecution.getStateTimestamp(ExecutionState.DEPLOYING))
+                
.isEqualTo(archivedExecution.getStateTimestamp(ExecutionState.DEPLOYING));
+        
assertThat(runtimeExecution.getStateTimestamp(ExecutionState.INITIALIZING))
+                
.isEqualTo(archivedExecution.getStateTimestamp(ExecutionState.INITIALIZING));
+        assertThat(runtimeExecution.getStateTimestamp(ExecutionState.RUNNING))
+                
.isEqualTo(archivedExecution.getStateTimestamp(ExecutionState.RUNNING));
+        assertThat(runtimeExecution.getStateTimestamp(ExecutionState.FINISHED))
+                
.isEqualTo(archivedExecution.getStateTimestamp(ExecutionState.FINISHED));
+        
assertThat(runtimeExecution.getStateTimestamp(ExecutionState.CANCELING))
+                
.isEqualTo(archivedExecution.getStateTimestamp(ExecutionState.CANCELING));
+        assertThat(runtimeExecution.getStateTimestamp(ExecutionState.CANCELED))
+                
.isEqualTo(archivedExecution.getStateTimestamp(ExecutionState.CANCELED));
+        assertThat(runtimeExecution.getStateTimestamp(ExecutionState.FAILED))
+                
.isEqualTo(archivedExecution.getStateTimestamp(ExecutionState.FAILED));
+        
assertThat(runtimeExecution.getStateEndTimestamp(ExecutionState.CREATED))
+                
.isEqualTo(archivedExecution.getStateEndTimestamp(ExecutionState.CREATED));
+        
assertThat(runtimeExecution.getStateEndTimestamp(ExecutionState.SCHEDULED))
+                
.isEqualTo(archivedExecution.getStateEndTimestamp(ExecutionState.SCHEDULED));
+        
assertThat(runtimeExecution.getStateEndTimestamp(ExecutionState.DEPLOYING))
+                
.isEqualTo(archivedExecution.getStateEndTimestamp(ExecutionState.DEPLOYING));
+        
assertThat(runtimeExecution.getStateEndTimestamp(ExecutionState.INITIALIZING))
+                
.isEqualTo(archivedExecution.getStateEndTimestamp(ExecutionState.INITIALIZING));
+        
assertThat(runtimeExecution.getStateEndTimestamp(ExecutionState.RUNNING))
+                
.isEqualTo(archivedExecution.getStateEndTimestamp(ExecutionState.RUNNING));
+        
assertThat(runtimeExecution.getStateEndTimestamp(ExecutionState.FINISHED))
+                
.isEqualTo(archivedExecution.getStateEndTimestamp(ExecutionState.FINISHED));
+        
assertThat(runtimeExecution.getStateEndTimestamp(ExecutionState.CANCELING))
+                
.isEqualTo(archivedExecution.getStateEndTimestamp(ExecutionState.CANCELING));
+        
assertThat(runtimeExecution.getStateEndTimestamp(ExecutionState.CANCELED))
+                
.isEqualTo(archivedExecution.getStateEndTimestamp(ExecutionState.CANCELED));
+        
assertThat(runtimeExecution.getStateEndTimestamp(ExecutionState.FAILED))
+                
.isEqualTo(archivedExecution.getStateEndTimestamp(ExecutionState.FAILED));
+        compareStringifiedAccumulators(
+                runtimeExecution.getUserAccumulatorsStringified(),
+                archivedExecution.getUserAccumulatorsStringified());
+        assertThat(runtimeExecution.getParallelSubtaskIndex())
+                .isEqualTo(archivedExecution.getParallelSubtaskIndex());
+    }
+
+    static void compareStringifiedAccumulators(
+            StringifiedAccumulatorResult[] runtimeAccs,
+            StringifiedAccumulatorResult[] archivedAccs) {
+        assertThat(runtimeAccs.length).isEqualTo(archivedAccs.length);
+
+        for (int x = 0; x < runtimeAccs.length; x++) {
+            StringifiedAccumulatorResult runtimeResult = runtimeAccs[x];
+            StringifiedAccumulatorResult archivedResult = archivedAccs[x];
+
+            
assertThat(runtimeResult.getName()).isEqualTo(archivedResult.getName());
+            
assertThat(runtimeResult.getType()).isEqualTo(archivedResult.getType());
+            
assertThat(runtimeResult.getValue()).isEqualTo(archivedResult.getValue());
+        }
+    }
+
+    static void compareSerializedAccumulators(
+            Map<String, SerializedValue<OptionalFailure<Object>>> runtimeAccs,
+            Map<String, SerializedValue<OptionalFailure<Object>>> archivedAccs)
+            throws IOException, ClassNotFoundException {
+        assertThat(runtimeAccs.size()).isEqualTo(archivedAccs.size());
+        for (Entry<String, SerializedValue<OptionalFailure<Object>>> 
runtimeAcc :
+                runtimeAccs.entrySet()) {
+            long runtimeUserAcc =
+                    (long)
+                            runtimeAcc
+                                    .getValue()
+                                    
.deserializeValue(ClassLoader.getSystemClassLoader())
+                                    .getUnchecked();
+            long archivedUserAcc =
+                    (long)
+                            archivedAccs
+                                    .get(runtimeAcc.getKey())
+                                    
.deserializeValue(ClassLoader.getSystemClassLoader())
+                                    .getUnchecked();
+
+            assertThat(runtimeUserAcc).isEqualTo(archivedUserAcc);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
index 0ab6ddcd975..790ca43ce7d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
@@ -21,20 +21,19 @@ package org.apache.flink.runtime.messages.webmonitor;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
-import org.apache.flink.util.TestLogger;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link JobDetails}. */
-public class JobDetailsTest extends TestLogger {
+class JobDetailsTest {
     private static final String COMPATIBLE_JOB_DETAILS =
             "{"
                     + "  \"jid\" : \"7a7c3291accebd10b6be8d4f8c8d8dfc\","
@@ -60,7 +59,7 @@ public class JobDetailsTest extends TestLogger {
 
     /** Tests that we can marshal and unmarshal JobDetails instances. */
     @Test
-    public void testJobDetailsMarshalling() throws JsonProcessingException {
+    void testJobDetailsMarshalling() throws JsonProcessingException {
         final JobDetails expected =
                 new JobDetails(
                         new JobID(),
@@ -79,11 +78,11 @@ public class JobDetailsTest extends TestLogger {
 
         final JobDetails unmarshalled = objectMapper.treeToValue(marshalled, 
JobDetails.class);
 
-        assertEquals(expected, unmarshalled);
+        assertThat(unmarshalled).isEqualTo(expected);
     }
 
     @Test
-    public void testJobDetailsCompatibleUnmarshalling() throws IOException {
+    void testJobDetailsCompatibleUnmarshalling() throws IOException {
         final JobDetails expected =
                 new JobDetails(
                         
JobID.fromHexString("7a7c3291accebd10b6be8d4f8c8d8dfc"),
@@ -101,6 +100,6 @@ public class JobDetailsTest extends TestLogger {
         final JobDetails unmarshalled =
                 objectMapper.readValue(COMPATIBLE_JOB_DETAILS, 
JobDetails.class);
 
-        assertEquals(expected, unmarshalled);
+        assertThat(unmarshalled).isEqualTo(expected);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
index cfd7f51af35..52eec21ab5d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -26,8 +26,8 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.util.TestHistogram;
 
-import org.junit.Assert;
-import org.junit.Test;
+import org.assertj.core.data.Offset;
+import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -41,14 +41,13 @@ import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_C
 import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE;
 import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM;
 import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for the {@link MetricDumpSerialization}. */
-public class MetricDumpSerializerTest {
+class MetricDumpSerializerTest {
     @Test
-    public void testNullGaugeHandling() throws IOException {
+    void testNullGaugeHandling() throws IOException {
         MetricDumpSerialization.MetricDumpSerializer serializer =
                 new MetricDumpSerialization.MetricDumpSerializer();
         MetricDumpSerialization.MetricDumpDeserializer deserializer =
@@ -74,17 +73,16 @@ public class MetricDumpSerializerTest {
                         Collections.<Meter, Tuple2<QueryScopeInfo, 
String>>emptyMap());
 
         // no metrics should be serialized
-        Assert.assertEquals(0, output.serializedCounters.length);
-        Assert.assertEquals(0, output.serializedGauges.length);
-        Assert.assertEquals(0, output.serializedHistograms.length);
-        Assert.assertEquals(0, output.serializedMeters.length);
-
+        assertThat(output.serializedCounters).isEmpty();
+        assertThat(output.serializedGauges).isEmpty();
+        assertThat(output.serializedHistograms).isEmpty();
+        assertThat(output.serializedMeters).isEmpty();
         List<MetricDump> deserialized = deserializer.deserialize(output);
-        Assert.assertEquals(0, deserialized.size());
+        assertThat(deserialized).isEmpty();
     }
 
     @Test
-    public void testJavaSerialization() throws IOException {
+    void testJavaSerialization() throws IOException {
         MetricDumpSerialization.MetricDumpSerializer serializer =
                 new MetricDumpSerialization.MetricDumpSerializer();
 
@@ -100,7 +98,7 @@ public class MetricDumpSerializerTest {
     }
 
     @Test
-    public void testSerialization() throws IOException {
+    void testSerialization() throws IOException {
         MetricDumpSerialization.MetricDumpSerializer serializer =
                 new MetricDumpSerialization.MetricDumpSerializer();
         MetricDumpSerialization.MetricDumpDeserializer deserializer =
@@ -174,7 +172,7 @@ public class MetricDumpSerializerTest {
 
         // ===== Counters
         // 
==============================================================================================
-        assertEquals(5, deserialized.size());
+        assertThat(deserialized.size()).isEqualTo(5);
 
         for (MetricDump metric : deserialized) {
             switch (metric.getCategory()) {
@@ -182,87 +180,86 @@ public class MetricDumpSerializerTest {
                     MetricDump.CounterDump counterDump = 
(MetricDump.CounterDump) metric;
                     switch ((byte) counterDump.count) {
                         case 1:
-                            assertTrue(
-                                    counterDump.scopeInfo
-                                            instanceof 
QueryScopeInfo.JobManagerQueryScopeInfo);
-                            assertEquals("A", counterDump.scopeInfo.scope);
-                            assertEquals("c1", counterDump.name);
+                            assertThat(counterDump.scopeInfo)
+                                    
.isInstanceOf(QueryScopeInfo.JobManagerQueryScopeInfo.class);
+                            
assertThat(counterDump.scopeInfo.scope).isEqualTo("A");
+                            assertThat(counterDump.name).isEqualTo("c1");
                             counters.remove(c1);
                             break;
                         case 2:
-                            assertTrue(
-                                    counterDump.scopeInfo
-                                            instanceof 
QueryScopeInfo.TaskManagerQueryScopeInfo);
-                            assertEquals("B", counterDump.scopeInfo.scope);
-                            assertEquals("c2", counterDump.name);
-                            assertEquals(
-                                    "tmid",
-                                    ((QueryScopeInfo.TaskManagerQueryScopeInfo)
-                                                    counterDump.scopeInfo)
-                                            .taskManagerID);
+                            assertThat(counterDump.scopeInfo)
+                                    
.isInstanceOf(QueryScopeInfo.TaskManagerQueryScopeInfo.class);
+                            
assertThat(counterDump.scopeInfo.scope).isEqualTo("B");
+                            assertThat(counterDump.name).isEqualTo("c2");
+                            assertThat("tmid")
+                                    .isEqualTo(
+                                            
((QueryScopeInfo.TaskManagerQueryScopeInfo)
+                                                            
counterDump.scopeInfo)
+                                                    .taskManagerID);
                             counters.remove(c2);
                             break;
                         default:
-                            fail();
+                            fail("Unexpected counter count.");
                     }
                     break;
                 case METRIC_CATEGORY_GAUGE:
                     MetricDump.GaugeDump gaugeDump = (MetricDump.GaugeDump) 
metric;
-                    assertEquals("4", gaugeDump.value);
-                    assertEquals("g1", gaugeDump.name);
+                    assertThat(gaugeDump.value).isEqualTo("4");
+                    assertThat(gaugeDump.name).isEqualTo("g1");
 
-                    assertTrue(gaugeDump.scopeInfo instanceof 
QueryScopeInfo.TaskQueryScopeInfo);
+                    assertThat(gaugeDump.scopeInfo)
+                            
.isInstanceOf(QueryScopeInfo.TaskQueryScopeInfo.class);
                     QueryScopeInfo.TaskQueryScopeInfo taskInfo =
                             (QueryScopeInfo.TaskQueryScopeInfo) 
gaugeDump.scopeInfo;
-                    assertEquals("D", taskInfo.scope);
-                    assertEquals("jid", taskInfo.jobID);
-                    assertEquals("vid", taskInfo.vertexID);
-                    assertEquals(2, taskInfo.subtaskIndex);
+                    assertThat(taskInfo.scope).isEqualTo("D");
+                    assertThat(taskInfo.jobID).isEqualTo("jid");
+                    assertThat(taskInfo.vertexID).isEqualTo("vid");
+                    assertThat(taskInfo.subtaskIndex).isEqualTo(2);
                     gauges.remove(g1);
                     break;
                 case METRIC_CATEGORY_HISTOGRAM:
                     MetricDump.HistogramDump histogramDump = 
(MetricDump.HistogramDump) metric;
-                    assertEquals("h1", histogramDump.name);
-                    assertEquals(0.5, histogramDump.median, 0.1);
-                    assertEquals(0.75, histogramDump.p75, 0.1);
-                    assertEquals(0.90, histogramDump.p90, 0.1);
-                    assertEquals(0.95, histogramDump.p95, 0.1);
-                    assertEquals(0.98, histogramDump.p98, 0.1);
-                    assertEquals(0.99, histogramDump.p99, 0.1);
-                    assertEquals(0.999, histogramDump.p999, 0.1);
-                    assertEquals(4, histogramDump.mean, 0.1);
-                    assertEquals(5, histogramDump.stddev, 0.1);
-                    assertEquals(6, histogramDump.max);
-                    assertEquals(7, histogramDump.min);
+                    assertThat(histogramDump.name).isEqualTo("h1");
+                    assertThat(histogramDump.median).isCloseTo(0.5, 
Offset.offset(0.1));
+                    assertThat(histogramDump.p75).isCloseTo(0.75, 
Offset.offset(0.1));
+                    assertThat(histogramDump.p90).isCloseTo(0.9, 
Offset.offset(0.1));
+                    assertThat(histogramDump.p95).isCloseTo(0.95, 
Offset.offset(0.1));
+                    assertThat(histogramDump.p98).isCloseTo(0.98, 
Offset.offset(0.1));
+                    assertThat(histogramDump.p99).isCloseTo(0.99, 
Offset.offset(0.1));
+                    assertThat(histogramDump.p999).isCloseTo(0.999, 
Offset.offset(0.1));
+                    assertThat(histogramDump.mean).isCloseTo(4, 
Offset.offset(0.1));
+                    assertThat(histogramDump.stddev).isCloseTo(5, 
Offset.offset(0.1));
+                    assertThat(histogramDump.max).isEqualTo(6);
+                    assertThat(histogramDump.min).isEqualTo(7);
 
-                    assertTrue(
-                            histogramDump.scopeInfo
-                                    instanceof 
QueryScopeInfo.OperatorQueryScopeInfo);
+                    assertThat(histogramDump.scopeInfo)
+                            
.isInstanceOf(QueryScopeInfo.OperatorQueryScopeInfo.class);
                     QueryScopeInfo.OperatorQueryScopeInfo opInfo =
                             (QueryScopeInfo.OperatorQueryScopeInfo) 
histogramDump.scopeInfo;
-                    assertEquals("E", opInfo.scope);
-                    assertEquals("jid", opInfo.jobID);
-                    assertEquals("vid", opInfo.vertexID);
-                    assertEquals(2, opInfo.subtaskIndex);
-                    assertEquals("opname", opInfo.operatorName);
+                    assertThat(opInfo.scope).isEqualTo("E");
+                    assertThat(opInfo.jobID).isEqualTo("jid");
+                    assertThat(opInfo.vertexID).isEqualTo("vid");
+                    assertThat(opInfo.subtaskIndex).isEqualTo(2);
+                    assertThat(opInfo.operatorName).isEqualTo("opname");
                     histograms.remove(h1);
                     break;
                 case METRIC_CATEGORY_METER:
                     MetricDump.MeterDump meterDump = (MetricDump.MeterDump) 
metric;
-                    assertEquals(5.0, meterDump.rate, 0.1);
+                    assertThat(meterDump.rate).isCloseTo(5.0, 
Offset.offset(0.1));
 
-                    assertTrue(meterDump.scopeInfo instanceof 
QueryScopeInfo.JobQueryScopeInfo);
-                    assertEquals("C", meterDump.scopeInfo.scope);
-                    assertEquals("c3", meterDump.name);
-                    assertEquals(
-                            "jid", ((QueryScopeInfo.JobQueryScopeInfo) 
meterDump.scopeInfo).jobID);
+                    assertThat(meterDump.scopeInfo)
+                            
.isInstanceOf(QueryScopeInfo.JobQueryScopeInfo.class);
+                    assertThat(meterDump.scopeInfo.scope).isEqualTo("C");
+                    assertThat(meterDump.name).isEqualTo("c3");
+                    assertThat(((QueryScopeInfo.JobQueryScopeInfo) 
meterDump.scopeInfo).jobID)
+                            .isEqualTo("jid");
                     break;
                 default:
-                    fail();
+                    fail("Unexpected metric type: " + metric.getCategory());
             }
         }
-        assertTrue(counters.isEmpty());
-        assertTrue(gauges.isEmpty());
-        assertTrue(histograms.isEmpty());
+        assertThat(counters).isEmpty();
+        assertThat(gauges).isEmpty();
+        assertThat(histograms).isEmpty();
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
index 029f53e57cc..0355d01fcf0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
@@ -18,144 +18,144 @@
 
 package org.apache.flink.runtime.metrics.dump;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link QueryScopeInfo} classes. */
-public class QueryScopeInfoTest {
+class QueryScopeInfoTest {
     @Test
-    public void testJobManagerQueryScopeInfo() {
+    void testJobManagerQueryScopeInfo() {
         QueryScopeInfo.JobManagerQueryScopeInfo info =
                 new QueryScopeInfo.JobManagerQueryScopeInfo();
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory());
-        assertEquals("", info.scope);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_JM);
+        assertThat(info.scope).isEmpty();
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory());
-        assertEquals("world", info.scope);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_JM);
+        assertThat(info.scope).isEqualTo("world");
 
         info = new QueryScopeInfo.JobManagerQueryScopeInfo("hello");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory());
-        assertEquals("hello", info.scope);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_JM);
+        assertThat(info.scope).isEqualTo("hello");
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory());
-        assertEquals("hello.world", info.scope);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_JM);
+        assertThat(info.scope).isEqualTo("hello.world");
     }
 
     @Test
-    public void testTaskManagerQueryScopeInfo() {
+    void testTaskManagerQueryScopeInfo() {
         QueryScopeInfo.TaskManagerQueryScopeInfo info =
                 new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory());
-        assertEquals("", info.scope);
-        assertEquals("tmid", info.taskManagerID);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TM);
+        assertThat(info.scope).isEmpty();
+        assertThat(info.taskManagerID).isEqualTo("tmid");
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory());
-        assertEquals("world", info.scope);
-        assertEquals("tmid", info.taskManagerID);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TM);
+        assertThat(info.scope).isEqualTo("world");
+        assertThat(info.taskManagerID).isEqualTo("tmid");
 
         info = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "hello");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory());
-        assertEquals("hello", info.scope);
-        assertEquals("tmid", info.taskManagerID);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TM);
+        assertThat(info.scope).isEqualTo("hello");
+        assertThat(info.taskManagerID).isEqualTo("tmid");
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory());
-        assertEquals("hello.world", info.scope);
-        assertEquals("tmid", info.taskManagerID);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TM);
+        assertThat(info.scope).isEqualTo("hello.world");
+        assertThat(info.taskManagerID).isEqualTo("tmid");
     }
 
     @Test
-    public void testJobQueryScopeInfo() {
+    void testJobQueryScopeInfo() {
         QueryScopeInfo.JobQueryScopeInfo info = new 
QueryScopeInfo.JobQueryScopeInfo("jobid");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, info.getCategory());
-        assertEquals("", info.scope);
-        assertEquals("jobid", info.jobID);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_JOB);
+        assertThat(info.scope).isEmpty();
+        assertThat(info.jobID).isEqualTo("jobid");
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, info.getCategory());
-        assertEquals("world", info.scope);
-        assertEquals("jobid", info.jobID);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_JOB);
+        assertThat(info.scope).isEqualTo("world");
+        assertThat(info.jobID).isEqualTo("jobid");
 
         info = new QueryScopeInfo.JobQueryScopeInfo("jobid", "hello");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, info.getCategory());
-        assertEquals("hello", info.scope);
-        assertEquals("jobid", info.jobID);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_JOB);
+        assertThat(info.scope).isEqualTo("hello");
+        assertThat(info.jobID).isEqualTo("jobid");
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, info.getCategory());
-        assertEquals("hello.world", info.scope);
-        assertEquals("jobid", info.jobID);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_JOB);
+        assertThat(info.scope).isEqualTo("hello.world");
+        assertThat(info.jobID).isEqualTo("jobid");
     }
 
     @Test
-    public void testTaskQueryScopeInfo() {
+    void testTaskQueryScopeInfo() {
         QueryScopeInfo.TaskQueryScopeInfo info =
                 new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2);
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, info.getCategory());
-        assertEquals("", info.scope);
-        assertEquals("jobid", info.jobID);
-        assertEquals("taskid", info.vertexID);
-        assertEquals(2, info.subtaskIndex);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TASK);
+        assertThat(info.scope).isEmpty();
+        assertThat(info.jobID).isEqualTo("jobid");
+        assertThat(info.vertexID).isEqualTo("taskid");
+        assertThat(info.subtaskIndex).isEqualTo(2);
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, info.getCategory());
-        assertEquals("world", info.scope);
-        assertEquals("jobid", info.jobID);
-        assertEquals("taskid", info.vertexID);
-        assertEquals(2, info.subtaskIndex);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TASK);
+        assertThat(info.scope).isEqualTo("world");
+        assertThat(info.jobID).isEqualTo("jobid");
+        assertThat(info.vertexID).isEqualTo("taskid");
+        assertThat(info.subtaskIndex).isEqualTo(2);
 
         info = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2, 
"hello");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, info.getCategory());
-        assertEquals("hello", info.scope);
-        assertEquals("jobid", info.jobID);
-        assertEquals("taskid", info.vertexID);
-        assertEquals(2, info.subtaskIndex);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TASK);
+        assertThat(info.scope).isEqualTo("hello");
+        assertThat(info.jobID).isEqualTo("jobid");
+        assertThat(info.vertexID).isEqualTo("taskid");
+        assertThat(info.subtaskIndex).isEqualTo(2);
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, info.getCategory());
-        assertEquals("hello.world", info.scope);
-        assertEquals("jobid", info.jobID);
-        assertEquals("taskid", info.vertexID);
-        assertEquals(2, info.subtaskIndex);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TASK);
+        assertThat(info.scope).isEqualTo("hello.world");
+        assertThat(info.jobID).isEqualTo("jobid");
+        assertThat(info.vertexID).isEqualTo("taskid");
+        assertThat(info.subtaskIndex).isEqualTo(2);
     }
 
     @Test
-    public void testOperatorQueryScopeInfo() {
+    void testOperatorQueryScopeInfo() {
         QueryScopeInfo.OperatorQueryScopeInfo info =
                 new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 
2, "opname");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, 
info.getCategory());
-        assertEquals("", info.scope);
-        assertEquals("jobid", info.jobID);
-        assertEquals("taskid", info.vertexID);
-        assertEquals("opname", info.operatorName);
-        assertEquals(2, info.subtaskIndex);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_OPERATOR);
+        assertThat(info.scope).isEmpty();
+        assertThat(info.jobID).isEqualTo("jobid");
+        assertThat(info.vertexID).isEqualTo("taskid");
+        assertThat(info.operatorName).isEqualTo("opname");
+        assertThat(info.subtaskIndex).isEqualTo(2);
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, 
info.getCategory());
-        assertEquals("world", info.scope);
-        assertEquals("jobid", info.jobID);
-        assertEquals("taskid", info.vertexID);
-        assertEquals("opname", info.operatorName);
-        assertEquals(2, info.subtaskIndex);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_OPERATOR);
+        assertThat(info.scope).isEqualTo("world");
+        assertThat(info.jobID).isEqualTo("jobid");
+        assertThat(info.vertexID).isEqualTo("taskid");
+        assertThat(info.operatorName).isEqualTo("opname");
+        assertThat(info.subtaskIndex).isEqualTo(2);
 
         info = new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, 
"opname", "hello");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, 
info.getCategory());
-        assertEquals("hello", info.scope);
-        assertEquals("jobid", info.jobID);
-        assertEquals("taskid", info.vertexID);
-        assertEquals("opname", info.operatorName);
-        assertEquals(2, info.subtaskIndex);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_OPERATOR);
+        assertThat(info.scope).isEqualTo("hello");
+        assertThat(info.jobID).isEqualTo("jobid");
+        assertThat(info.vertexID).isEqualTo("taskid");
+        assertThat(info.operatorName).isEqualTo("opname");
+        assertThat(info.subtaskIndex).isEqualTo(2);
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, 
info.getCategory());
-        assertEquals("hello.world", info.scope);
-        assertEquals("jobid", info.jobID);
-        assertEquals("taskid", info.vertexID);
-        assertEquals("opname", info.operatorName);
-        assertEquals(2, info.subtaskIndex);
+        
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_OPERATOR);
+        assertThat(info.scope).isEqualTo("hello.world");
+        assertThat(info.jobID).isEqualTo("jobid");
+        assertThat(info.vertexID).isEqualTo("taskid");
+        assertThat(info.operatorName).isEqualTo("opname");
+        assertThat(info.subtaskIndex).isEqualTo(2);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
index d38b7c1b8ed..77cd5047d3e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
@@ -38,8 +38,8 @@ import 
org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
 import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -52,12 +52,10 @@ import java.util.stream.Collectors;
 import static 
org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH;
 import static 
org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo.VertexBackPressureLevel.LOW;
 import static 
org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo.VertexBackPressureLevel.OK;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link JobVertexBackPressureHandler}. */
-public class JobVertexBackPressureHandlerTest {
+class JobVertexBackPressureHandlerTest {
 
     /** Job ID for which back pressure stats exist. */
     private static final JobID TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE = new 
JobID();
@@ -107,8 +105,8 @@ public class JobVertexBackPressureHandlerTest {
         return dumps;
     }
 
-    @Before
-    public void setUp() {
+    @BeforeEach
+    void setUp() {
         metricStore = new MetricStore();
         for (MetricDump metricDump : getMetricDumps()) {
             metricStore.add(metricDump);
@@ -141,7 +139,7 @@ public class JobVertexBackPressureHandlerTest {
     }
 
     @Test
-    public void testGetBackPressure() throws Exception {
+    void testGetBackPressure() throws Exception {
         final Map<String, String> pathParameters = new HashMap<>();
         pathParameters.put(
                 JobIDPathParameter.KEY, 
TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString());
@@ -161,42 +159,42 @@ public class JobVertexBackPressureHandlerTest {
         final JobVertexBackPressureInfo jobVertexBackPressureInfo =
                 jobVertexBackPressureInfoCompletableFuture.get();
 
-        assertThat(jobVertexBackPressureInfo.getStatus(), 
equalTo(VertexBackPressureStatus.OK));
-        assertThat(jobVertexBackPressureInfo.getBackpressureLevel(), 
equalTo(HIGH));
+        
assertThat(jobVertexBackPressureInfo.getStatus()).isEqualTo(VertexBackPressureStatus.OK);
+        
assertThat(jobVertexBackPressureInfo.getBackpressureLevel()).isEqualTo(HIGH);
 
         assertThat(
-                jobVertexBackPressureInfo.getSubtasks().stream()
-                        .map(SubtaskBackPressureInfo::getBackPressuredRatio)
-                        .collect(Collectors.toList()),
-                contains(1.0, 0.5, 0.1));
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                
.map(SubtaskBackPressureInfo::getBackPressuredRatio)
+                                .collect(Collectors.toList()))
+                .containsExactly(1.0, 0.5, 0.1);
 
         assertThat(
-                jobVertexBackPressureInfo.getSubtasks().stream()
-                        .map(SubtaskBackPressureInfo::getIdleRatio)
-                        .collect(Collectors.toList()),
-                contains(0.0, 0.1, 0.2));
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getIdleRatio)
+                                .collect(Collectors.toList()))
+                .containsExactly(0.0, 0.1, 0.2);
 
         assertThat(
-                jobVertexBackPressureInfo.getSubtasks().stream()
-                        .map(SubtaskBackPressureInfo::getBusyRatio)
-                        .collect(Collectors.toList()),
-                contains(0.0, 0.9, 0.7));
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getBusyRatio)
+                                .collect(Collectors.toList()))
+                .containsExactly(0.0, 0.9, 0.7);
 
         assertThat(
-                jobVertexBackPressureInfo.getSubtasks().stream()
-                        .map(SubtaskBackPressureInfo::getBackpressureLevel)
-                        .collect(Collectors.toList()),
-                contains(HIGH, LOW, OK));
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                
.map(SubtaskBackPressureInfo::getBackpressureLevel)
+                                .collect(Collectors.toList()))
+                .containsExactly(HIGH, LOW, OK);
 
         assertThat(
-                jobVertexBackPressureInfo.getSubtasks().stream()
-                        .map(SubtaskBackPressureInfo::getSubtask)
-                        .collect(Collectors.toList()),
-                contains(0, 1, 3));
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getSubtask)
+                                .collect(Collectors.toList()))
+                .containsExactly(0, 1, 3);
     }
 
     @Test
-    public void testAbsentBackPressure() throws Exception {
+    void testAbsentBackPressure() throws Exception {
         final Map<String, String> pathParameters = new HashMap<>();
         pathParameters.put(
                 JobIDPathParameter.KEY, 
TEST_JOB_ID_BACK_PRESSURE_STATS_ABSENT.toString());
@@ -216,8 +214,7 @@ public class JobVertexBackPressureHandlerTest {
         final JobVertexBackPressureInfo jobVertexBackPressureInfo =
                 jobVertexBackPressureInfoCompletableFuture.get();
 
-        assertThat(
-                jobVertexBackPressureInfo.getStatus(),
-                equalTo(VertexBackPressureStatus.DEPRECATED));
+        assertThat(jobVertexBackPressureInfo.getStatus())
+                .isEqualTo(VertexBackPressureStatus.DEPRECATED);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
index 9bb6d1fde71..97c739df224 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
@@ -20,38 +20,40 @@ package 
org.apache.flink.runtime.rest.handler.legacy.metrics;
 
 import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the MetricStore. */
-public class MetricStoreTest extends TestLogger {
+class MetricStoreTest {
+
     @Test
-    public void testAdd() throws IOException {
+    void testAdd() throws IOException {
         MetricStore store = setupStore(new MetricStore());
 
-        assertEquals("0", 
store.getJobManagerMetricStore().getMetric("abc.metric1", "-1"));
-        assertEquals("1", 
store.getTaskManagerMetricStore("tmid").getMetric("abc.metric2", "-1"));
-        assertEquals("2", 
store.getJobMetricStore("jobid").getMetric("abc.metric3", "-1"));
-        assertEquals("3", 
store.getJobMetricStore("jobid").getMetric("abc.metric4", "-1"));
-        assertEquals(
-                "4", store.getTaskMetricStore("jobid", 
"taskid").getMetric("8.abc.metric5", "-1"));
-        assertEquals(
-                "5",
-                store.getTaskMetricStore("jobid", "taskid")
-                        .getMetric("8.opname.abc.metric6", "-1"));
-        assertEquals(
-                "6",
-                store.getTaskMetricStore("jobid", "taskid")
-                        .getMetric("8.opname.abc.metric7", "-1"));
+        assertThat(store.getJobManagerMetricStore().getMetric("abc.metric1", 
"-1")).isEqualTo("0");
+        
assertThat(store.getTaskManagerMetricStore("tmid").getMetric("abc.metric2", 
"-1"))
+                .isEqualTo("1");
+        assertThat(store.getJobMetricStore("jobid").getMetric("abc.metric3", 
"-1")).isEqualTo("2");
+        assertThat(store.getJobMetricStore("jobid").getMetric("abc.metric4", 
"-1")).isEqualTo("3");
+
+        assertThat(store.getTaskMetricStore("jobid", 
"taskid").getMetric("8.abc.metric5", "-1"))
+                .isEqualTo("4");
+        assertThat(
+                        store.getTaskMetricStore("jobid", "taskid")
+                                .getMetric("8.opname.abc.metric6", "-1"))
+                .isEqualTo("5");
+        assertThat(
+                        store.getTaskMetricStore("jobid", "taskid")
+                                .getMetric("8.opname.abc.metric7", "-1"))
+                .isEqualTo("6");
     }
 
     @Test
-    public void testMalformedNameHandling() {
+    void testMalformedNameHandling() {
         MetricStore store = new MetricStore();
         // -----verify that no exceptions are thrown
 
@@ -64,12 +66,12 @@ public class MetricStoreTest extends TestLogger {
         store.add(cd);
 
         // -----verify that no side effects occur
-        assertEquals(0, store.getJobManager().metrics.size());
-        assertEquals(0, store.getTaskManagers().size());
-        assertEquals(0, store.getJobs().size());
+        assertThat(store.getJobManager().metrics).isEmpty();
+        assertThat(store.getTaskManagers()).isEmpty();
+        assertThat(store.getJobs()).isEmpty();
     }
 
-    public static MetricStore setupStore(MetricStore store) {
+    static MetricStore setupStore(MetricStore store) {
         QueryScopeInfo.JobManagerQueryScopeInfo jm =
                 new QueryScopeInfo.JobManagerQueryScopeInfo("abc");
         MetricDump.CounterDump cd1 = new MetricDump.CounterDump(jm, "metric1", 
0);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
index ee6f15c2b29..95a64901af9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
@@ -39,13 +39,12 @@ import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMetricsInfo;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.util.TestLogger;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -54,11 +53,10 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests the {@link TaskManagerDetailsHandler} implementation. */
-public class TaskManagerDetailsHandlerTest extends TestLogger {
+class TaskManagerDetailsHandlerTest {
 
     private static final ResourceID TASK_MANAGER_ID = ResourceID.generate();
 
@@ -67,8 +65,8 @@ public class TaskManagerDetailsHandlerTest extends TestLogger 
{
 
     private TaskManagerDetailsHandler testInstance;
 
-    @Before
-    public void setup() throws HandlerRequestException {
+    @BeforeEach
+    void setup() throws HandlerRequestException {
         resourceManagerGateway = new TestingResourceManagerGateway();
         metricFetcher = new TestingMetricFetcher();
 
@@ -83,7 +81,7 @@ public class TaskManagerDetailsHandlerTest extends TestLogger 
{
     }
 
     @Test
-    public void testTaskManagerMetricsInfoExtraction()
+    void testTaskManagerMetricsInfoExtraction()
             throws RestHandlerException, ExecutionException, 
InterruptedException,
                     JsonProcessingException, HandlerRequestException {
         initializeMetricStore(metricFetcher.getMetricStore());
@@ -124,7 +122,7 @@ public class TaskManagerDetailsHandlerTest extends 
TestLogger {
         String actualJson = objectMapper.writeValueAsString(actual);
         String expectedJson = objectMapper.writeValueAsString(expected);
 
-        assertThat(actualJson, is(expectedJson));
+        assertThat(actualJson).isEqualTo(expectedJson);
     }
 
     private static void initializeMetricStore(MetricStore metricStore) {

Reply via email to