1996fanrui commented on code in PR #23199:
URL: https://github.com/apache/flink/pull/23199#discussion_r1298421164


##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/BoundedFIFOQueueTest.java:
##########
@@ -18,90 +18,83 @@
 
 package org.apache.flink.runtime.util;
 
-import org.apache.flink.util.TestLogger;
+import org.junit.jupiter.api.Test;
 
-import org.junit.Test;
-
-import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
-import static org.hamcrest.collection.IsIterableWithSize.iterableWithSize;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** {@code BoundedFIFOQueueTest} tests {@link BoundedFIFOQueue}. */
-public class BoundedFIFOQueueTest extends TestLogger {
+class BoundedFIFOQueueTest {
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testConstructorFailing() {
-        new BoundedFIFOQueue<>(-1);
+    @Test
+    void testConstructorFailing() {
+        assertThatThrownBy(() -> new BoundedFIFOQueue<>(-1))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testQueueWithMaxSize0() {
+    void testQueueWithMaxSize0() {
         final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(0);
-        assertThat(testInstance, iterableWithSize(0));
+        assertThat(testInstance).isEmpty();
         testInstance.add(1);
-        assertThat(testInstance, iterableWithSize(0));
+        assertThat(testInstance).isEmpty();
     }
 
     @Test
-    public void testQueueWithMaxSize2() {
+    void testQueueWithMaxSize2() {
         final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(2);
-        assertThat(testInstance, iterableWithSize(0));
+        assertThat(testInstance.size()).isZero();

Review Comment:
   ```suggestion
           assertThat(testInstance).isEmpty();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java:
##########
@@ -86,49 +84,47 @@ public void testSerialization() {
                     String.format(
                             "%s: %s",
                             userException2.getClass().getName(), 
userException2.getMessage());
-            assertEquals(serialized2.getMessage(), result);
+            assertThat(serialized2).hasMessage(result);
 
             // copy the serialized throwable and make sure everything still 
works
             SerializedThrowable copy = 
CommonTestUtils.createCopySerializable(serialized);
-            assertEquals(
-                    ExceptionUtils.stringifyException(userException),
-                    ExceptionUtils.stringifyException(copy));
-            assertArrayEquals(userException.getStackTrace(), 
copy.getStackTrace());
+            assertThat(ExceptionUtils.stringifyException(userException))
+                    .isEqualTo(ExceptionUtils.stringifyException(copy));
+            
assertThat(userException.getStackTrace()).isEqualTo(copy.getStackTrace());
 
             // deserialize the proper exception
             Throwable deserialized = copy.deserializeError(loader);
-            assertEquals(clazz, deserialized.getClass());
+            assertThat(deserialized.getClass()).isEqualTo(clazz);
 
             // deserialization with the wrong classloader does not lead to a 
failure
             Throwable wronglyDeserialized = 
copy.deserializeError(getClass().getClassLoader());
-            assertEquals(
-                    ExceptionUtils.stringifyException(userException),
-                    ExceptionUtils.stringifyException(wronglyDeserialized));
+            assertThat(ExceptionUtils.stringifyException(userException))
+                    
.isEqualTo(ExceptionUtils.stringifyException(wronglyDeserialized));
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
         }
     }
 
     @Test
-    public void testCauseChaining() {
+    void testCauseChaining() {
         Exception cause2 = new Exception("level2");
         Exception cause1 = new Exception("level1", cause2);
         Exception root = new Exception("level0", cause1);
 
         SerializedThrowable st = new SerializedThrowable(root);
 
-        assertEquals("java.lang.Exception: level0", st.getMessage());
+        assertThat(st).hasMessage("java.lang.Exception: level0");
 
-        assertNotNull(st.getCause());
-        assertEquals("java.lang.Exception: level1", 
st.getCause().getMessage());
+        assertThat(st.getCause()).isNotNull();
+        assertThat(st.getCause()).hasMessage("java.lang.Exception: level1");

Review Comment:
   ```suggestion
           
assertThat(st.getCause()).isNotNull().hasMessage("java.lang.Exception: level1");
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/RunnablesTest.java:
##########
@@ -125,11 +123,11 @@ private static void 
testWithUncaughtExceptionHandler(Runnable runnable, Throwabl
                             handlerCalled.countDown();
                         });
         scheduledExecutorService.execute(guardedRunnable);
-        Assert.assertTrue(handlerCalled.await(100, TimeUnit.MILLISECONDS));
-        Assert.assertNotNull(thread.get());
-        Assert.assertNotNull(throwable.get());
-        Assert.assertEquals("ueh-test-0", thread.get().getName());
-        Assert.assertEquals(expected.getClass(), throwable.get().getClass());
-        Assert.assertEquals("foo", throwable.get().getMessage());
+        assertThat(handlerCalled.await(100, TimeUnit.MILLISECONDS)).isTrue();
+        assertThat(thread.get()).isNotNull();
+        assertThat(throwable.get()).isNotNull();

Review Comment:
   ```suggestion
           assertThat(thread).isNotNull();
           assertThat(throwable).isNotNull();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java:
##########
@@ -139,53 +135,53 @@ public void testCyclicCauseChaining() {
 
         SerializedThrowable st = new SerializedThrowable(root);
 
-        assertArrayEquals(root.getStackTrace(), st.getStackTrace());
-        assertEquals(
-                ExceptionUtils.stringifyException(root), 
ExceptionUtils.stringifyException(st));
+        assertThat(root.getStackTrace()).isEqualTo(st.getStackTrace());
+        assertThat(ExceptionUtils.stringifyException(root))
+                .isEqualTo(ExceptionUtils.stringifyException(st));
     }
 
     @Test
-    public void testCopyPreservesCause() {
+    void testCopyPreservesCause() {
         Exception original = new Exception("original message");
         Exception parent = new Exception("parent message", original);
 
         SerializedThrowable serialized = new SerializedThrowable(parent);
-        assertNotNull(serialized.getCause());
+        assertThat(serialized.getCause()).isNotNull();
 
         SerializedThrowable copy = new SerializedThrowable(serialized);
-        assertEquals(
-                "org.apache.flink.util.SerializedThrowable: 
java.lang.Exception: parent message",
-                copy.getMessage());
-        assertNotNull(copy.getCause());
-        assertEquals("java.lang.Exception: original message", 
copy.getCause().getMessage());
+        assertThat(copy)
+                .hasMessage(
+                        "org.apache.flink.util.SerializedThrowable: 
java.lang.Exception: parent message");
+        assertThat(copy.getCause()).isNotNull();
+        assertThat(copy.getCause()).hasMessage("java.lang.Exception: original 
message");

Review Comment:
   ```suggestion
           
assertThat(copy.getCause()).isNotNull().hasMessage("java.lang.Exception: 
original message");
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java:
##########
@@ -139,53 +135,53 @@ public void testCyclicCauseChaining() {
 
         SerializedThrowable st = new SerializedThrowable(root);
 
-        assertArrayEquals(root.getStackTrace(), st.getStackTrace());
-        assertEquals(
-                ExceptionUtils.stringifyException(root), 
ExceptionUtils.stringifyException(st));
+        assertThat(root.getStackTrace()).isEqualTo(st.getStackTrace());
+        assertThat(ExceptionUtils.stringifyException(root))
+                .isEqualTo(ExceptionUtils.stringifyException(st));
     }
 
     @Test
-    public void testCopyPreservesCause() {
+    void testCopyPreservesCause() {
         Exception original = new Exception("original message");
         Exception parent = new Exception("parent message", original);
 
         SerializedThrowable serialized = new SerializedThrowable(parent);
-        assertNotNull(serialized.getCause());
+        assertThat(serialized.getCause()).isNotNull();
 
         SerializedThrowable copy = new SerializedThrowable(serialized);
-        assertEquals(
-                "org.apache.flink.util.SerializedThrowable: 
java.lang.Exception: parent message",
-                copy.getMessage());
-        assertNotNull(copy.getCause());
-        assertEquals("java.lang.Exception: original message", 
copy.getCause().getMessage());
+        assertThat(copy)
+                .hasMessage(
+                        "org.apache.flink.util.SerializedThrowable: 
java.lang.Exception: parent message");
+        assertThat(copy.getCause()).isNotNull();
+        assertThat(copy.getCause()).hasMessage("java.lang.Exception: original 
message");
     }
 
     @Test
-    public void testSuppressedTransferring() {
+    void testSuppressedTransferring() {
         Exception root = new Exception("root");
         Exception suppressed = new Exception("suppressed");
         root.addSuppressed(suppressed);
 
         SerializedThrowable serializedThrowable = new 
SerializedThrowable(root);
 
-        assertEquals(1, serializedThrowable.getSuppressed().length);
+        assertThat(serializedThrowable.getSuppressed().length).isOne();
         Throwable actualSuppressed = serializedThrowable.getSuppressed()[0];
-        assertTrue(actualSuppressed instanceof SerializedThrowable);
-        assertEquals("java.lang.Exception: suppressed", 
actualSuppressed.getMessage());
+        assertThat(actualSuppressed).isInstanceOf(SerializedThrowable.class);
+        assertThat(actualSuppressed).hasMessage("java.lang.Exception: 
suppressed");
     }
 
     @Test
-    public void testCopySuppressed() {
+    void testCopySuppressed() {
         Exception root = new Exception("root");
         Exception suppressed = new Exception("suppressed");
         root.addSuppressed(suppressed);
 
         SerializedThrowable serializedThrowable = new 
SerializedThrowable(root);
         SerializedThrowable copy = new 
SerializedThrowable(serializedThrowable);
 
-        assertEquals(1, copy.getSuppressed().length);
+        assertThat(copy.getSuppressed().length).isOne();
         Throwable actualSuppressed = copy.getSuppressed()[0];
-        assertTrue(actualSuppressed instanceof SerializedThrowable);
-        assertEquals("java.lang.Exception: suppressed", 
actualSuppressed.getMessage());
+        assertThat(actualSuppressed).isInstanceOf(SerializedThrowable.class);
+        assertThat(actualSuppressed).hasMessage("java.lang.Exception: 
suppressed");

Review Comment:
   ```suggestion
           
assertThat(actualSuppressed).isInstanceOf(SerializedThrowable.class).hasMessage("java.lang.Exception:
 suppressed");
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/BoundedFIFOQueueTest.java:
##########
@@ -18,90 +18,83 @@
 
 package org.apache.flink.runtime.util;
 
-import org.apache.flink.util.TestLogger;
+import org.junit.jupiter.api.Test;
 
-import org.junit.Test;
-
-import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
-import static org.hamcrest.collection.IsIterableWithSize.iterableWithSize;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** {@code BoundedFIFOQueueTest} tests {@link BoundedFIFOQueue}. */
-public class BoundedFIFOQueueTest extends TestLogger {
+class BoundedFIFOQueueTest {
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testConstructorFailing() {
-        new BoundedFIFOQueue<>(-1);
+    @Test
+    void testConstructorFailing() {
+        assertThatThrownBy(() -> new BoundedFIFOQueue<>(-1))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testQueueWithMaxSize0() {
+    void testQueueWithMaxSize0() {
         final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(0);
-        assertThat(testInstance, iterableWithSize(0));
+        assertThat(testInstance).isEmpty();
         testInstance.add(1);
-        assertThat(testInstance, iterableWithSize(0));
+        assertThat(testInstance).isEmpty();
     }
 
     @Test
-    public void testQueueWithMaxSize2() {
+    void testQueueWithMaxSize2() {
         final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(2);
-        assertThat(testInstance, iterableWithSize(0));
+        assertThat(testInstance.size()).isZero();
 
         testInstance.add(1);
-        assertThat(testInstance, contains(1));
+        assertThat(testInstance).contains(1);
 
         testInstance.add(2);
-        assertThat(testInstance, contains(1, 2));
+        assertThat(testInstance).contains(1, 2);
 
         testInstance.add(3);
-        assertThat(testInstance, contains(2, 3));
+        assertThat(testInstance).contains(2, 3);
     }
 
     @Test
-    public void testAddNullHandling() {
+    void testAddNullHandling() {
         final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(1);
-        try {
-            testInstance.add(null);
-            fail("A NullPointerException is expected to be thrown.");
-        } catch (NullPointerException e) {
-            // NullPointerException is expected
-        }
-
-        assertThat(testInstance, iterableWithSize(0));
+        assertThatThrownBy(() -> testInstance.add(null))
+                .withFailMessage("A NullPointerException is expected to be 
thrown.")
+                .isInstanceOf(NullPointerException.class);
+
+        assertThat(testInstance).isEmpty();
     }
 
     /**
      * Tests that {@link BoundedFIFOQueue#size()} returns the number of 
elements currently stored in
      * the queue with a {@code maxSize} of 0.
      */
     @Test
-    public void testSizeWithMaxSize0() {
+    void testSizeWithMaxSize0() {
         final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(0);
-        assertThat(testInstance.size(), is(0));
+        assertThat(testInstance).isEmpty();
 
         testInstance.add(1);
-        assertThat(testInstance.size(), is(0));
+        assertThat(testInstance).isEmpty();
     }
 
     /**
      * Tests that {@link BoundedFIFOQueue#size()} returns the number of 
elements currently stored in
      * the queue with a {@code maxSize} of 2.
      */
     @Test
-    public void testSizeWithMaxSize2() {
+    void testSizeWithMaxSize2() {
         final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(2);
-        assertThat(testInstance.size(), is(0));
+        assertThat(testInstance.size()).isZero();

Review Comment:
   ```suggestion
           assertThat(testInstance.size()).isEmpty();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java:
##########
@@ -139,53 +135,53 @@ public void testCyclicCauseChaining() {
 
         SerializedThrowable st = new SerializedThrowable(root);
 
-        assertArrayEquals(root.getStackTrace(), st.getStackTrace());
-        assertEquals(
-                ExceptionUtils.stringifyException(root), 
ExceptionUtils.stringifyException(st));
+        assertThat(root.getStackTrace()).isEqualTo(st.getStackTrace());
+        assertThat(ExceptionUtils.stringifyException(root))
+                .isEqualTo(ExceptionUtils.stringifyException(st));
     }
 
     @Test
-    public void testCopyPreservesCause() {
+    void testCopyPreservesCause() {
         Exception original = new Exception("original message");
         Exception parent = new Exception("parent message", original);
 
         SerializedThrowable serialized = new SerializedThrowable(parent);
-        assertNotNull(serialized.getCause());
+        assertThat(serialized.getCause()).isNotNull();
 
         SerializedThrowable copy = new SerializedThrowable(serialized);
-        assertEquals(
-                "org.apache.flink.util.SerializedThrowable: 
java.lang.Exception: parent message",
-                copy.getMessage());
-        assertNotNull(copy.getCause());
-        assertEquals("java.lang.Exception: original message", 
copy.getCause().getMessage());
+        assertThat(copy)
+                .hasMessage(
+                        "org.apache.flink.util.SerializedThrowable: 
java.lang.Exception: parent message");
+        assertThat(copy.getCause()).isNotNull();
+        assertThat(copy.getCause()).hasMessage("java.lang.Exception: original 
message");
     }
 
     @Test
-    public void testSuppressedTransferring() {
+    void testSuppressedTransferring() {
         Exception root = new Exception("root");
         Exception suppressed = new Exception("suppressed");
         root.addSuppressed(suppressed);
 
         SerializedThrowable serializedThrowable = new 
SerializedThrowable(root);
 
-        assertEquals(1, serializedThrowable.getSuppressed().length);
+        assertThat(serializedThrowable.getSuppressed().length).isOne();

Review Comment:
   ```suggestion
           assertThat(serializedThrowable.getSuppressed()).hasSize(1);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java:
##########
@@ -86,49 +84,47 @@ public void testSerialization() {
                     String.format(
                             "%s: %s",
                             userException2.getClass().getName(), 
userException2.getMessage());
-            assertEquals(serialized2.getMessage(), result);
+            assertThat(serialized2).hasMessage(result);
 
             // copy the serialized throwable and make sure everything still 
works
             SerializedThrowable copy = 
CommonTestUtils.createCopySerializable(serialized);
-            assertEquals(
-                    ExceptionUtils.stringifyException(userException),
-                    ExceptionUtils.stringifyException(copy));
-            assertArrayEquals(userException.getStackTrace(), 
copy.getStackTrace());
+            assertThat(ExceptionUtils.stringifyException(userException))
+                    .isEqualTo(ExceptionUtils.stringifyException(copy));
+            
assertThat(userException.getStackTrace()).isEqualTo(copy.getStackTrace());
 
             // deserialize the proper exception
             Throwable deserialized = copy.deserializeError(loader);
-            assertEquals(clazz, deserialized.getClass());
+            assertThat(deserialized.getClass()).isEqualTo(clazz);
 
             // deserialization with the wrong classloader does not lead to a 
failure
             Throwable wronglyDeserialized = 
copy.deserializeError(getClass().getClassLoader());
-            assertEquals(
-                    ExceptionUtils.stringifyException(userException),
-                    ExceptionUtils.stringifyException(wronglyDeserialized));
+            assertThat(ExceptionUtils.stringifyException(userException))
+                    
.isEqualTo(ExceptionUtils.stringifyException(wronglyDeserialized));
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
         }
     }
 
     @Test
-    public void testCauseChaining() {
+    void testCauseChaining() {
         Exception cause2 = new Exception("level2");
         Exception cause1 = new Exception("level1", cause2);
         Exception root = new Exception("level0", cause1);
 
         SerializedThrowable st = new SerializedThrowable(root);
 
-        assertEquals("java.lang.Exception: level0", st.getMessage());
+        assertThat(st).hasMessage("java.lang.Exception: level0");
 
-        assertNotNull(st.getCause());
-        assertEquals("java.lang.Exception: level1", 
st.getCause().getMessage());
+        assertThat(st.getCause()).isNotNull();
+        assertThat(st.getCause()).hasMessage("java.lang.Exception: level1");
 
-        assertNotNull(st.getCause().getCause());
-        assertEquals("java.lang.Exception: level2", 
st.getCause().getCause().getMessage());
+        assertThat(st.getCause().getCause()).isNotNull();
+        assertThat(st.getCause().getCause()).hasMessage("java.lang.Exception: 
level2");

Review Comment:
   ```suggestion
           
assertThat(st.getCause().getCause()).isNotNull().hasMessage("java.lang.Exception:
 level2");
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java:
##########
@@ -139,53 +135,53 @@ public void testCyclicCauseChaining() {
 
         SerializedThrowable st = new SerializedThrowable(root);
 
-        assertArrayEquals(root.getStackTrace(), st.getStackTrace());
-        assertEquals(
-                ExceptionUtils.stringifyException(root), 
ExceptionUtils.stringifyException(st));
+        assertThat(root.getStackTrace()).isEqualTo(st.getStackTrace());
+        assertThat(ExceptionUtils.stringifyException(root))
+                .isEqualTo(ExceptionUtils.stringifyException(st));
     }
 
     @Test
-    public void testCopyPreservesCause() {
+    void testCopyPreservesCause() {
         Exception original = new Exception("original message");
         Exception parent = new Exception("parent message", original);
 
         SerializedThrowable serialized = new SerializedThrowable(parent);
-        assertNotNull(serialized.getCause());
+        assertThat(serialized.getCause()).isNotNull();
 
         SerializedThrowable copy = new SerializedThrowable(serialized);
-        assertEquals(
-                "org.apache.flink.util.SerializedThrowable: 
java.lang.Exception: parent message",
-                copy.getMessage());
-        assertNotNull(copy.getCause());
-        assertEquals("java.lang.Exception: original message", 
copy.getCause().getMessage());
+        assertThat(copy)
+                .hasMessage(
+                        "org.apache.flink.util.SerializedThrowable: 
java.lang.Exception: parent message");
+        assertThat(copy.getCause()).isNotNull();
+        assertThat(copy.getCause()).hasMessage("java.lang.Exception: original 
message");
     }
 
     @Test
-    public void testSuppressedTransferring() {
+    void testSuppressedTransferring() {
         Exception root = new Exception("root");
         Exception suppressed = new Exception("suppressed");
         root.addSuppressed(suppressed);
 
         SerializedThrowable serializedThrowable = new 
SerializedThrowable(root);
 
-        assertEquals(1, serializedThrowable.getSuppressed().length);
+        assertThat(serializedThrowable.getSuppressed().length).isOne();
         Throwable actualSuppressed = serializedThrowable.getSuppressed()[0];
-        assertTrue(actualSuppressed instanceof SerializedThrowable);
-        assertEquals("java.lang.Exception: suppressed", 
actualSuppressed.getMessage());
+        assertThat(actualSuppressed).isInstanceOf(SerializedThrowable.class);
+        assertThat(actualSuppressed).hasMessage("java.lang.Exception: 
suppressed");

Review Comment:
   ```suggestion
           
assertThat(actualSuppressed).isInstanceOf(SerializedThrowable.class).hasMessage("java.lang.Exception:
 suppressed");
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java:
##########
@@ -139,53 +135,53 @@ public void testCyclicCauseChaining() {
 
         SerializedThrowable st = new SerializedThrowable(root);
 
-        assertArrayEquals(root.getStackTrace(), st.getStackTrace());
-        assertEquals(
-                ExceptionUtils.stringifyException(root), 
ExceptionUtils.stringifyException(st));
+        assertThat(root.getStackTrace()).isEqualTo(st.getStackTrace());
+        assertThat(ExceptionUtils.stringifyException(root))
+                .isEqualTo(ExceptionUtils.stringifyException(st));
     }
 
     @Test
-    public void testCopyPreservesCause() {
+    void testCopyPreservesCause() {
         Exception original = new Exception("original message");
         Exception parent = new Exception("parent message", original);
 
         SerializedThrowable serialized = new SerializedThrowable(parent);
-        assertNotNull(serialized.getCause());
+        assertThat(serialized.getCause()).isNotNull();
 
         SerializedThrowable copy = new SerializedThrowable(serialized);
-        assertEquals(
-                "org.apache.flink.util.SerializedThrowable: 
java.lang.Exception: parent message",
-                copy.getMessage());
-        assertNotNull(copy.getCause());
-        assertEquals("java.lang.Exception: original message", 
copy.getCause().getMessage());
+        assertThat(copy)
+                .hasMessage(
+                        "org.apache.flink.util.SerializedThrowable: 
java.lang.Exception: parent message");
+        assertThat(copy.getCause()).isNotNull();
+        assertThat(copy.getCause()).hasMessage("java.lang.Exception: original 
message");
     }
 
     @Test
-    public void testSuppressedTransferring() {
+    void testSuppressedTransferring() {
         Exception root = new Exception("root");
         Exception suppressed = new Exception("suppressed");
         root.addSuppressed(suppressed);
 
         SerializedThrowable serializedThrowable = new 
SerializedThrowable(root);
 
-        assertEquals(1, serializedThrowable.getSuppressed().length);
+        assertThat(serializedThrowable.getSuppressed().length).isOne();
         Throwable actualSuppressed = serializedThrowable.getSuppressed()[0];
-        assertTrue(actualSuppressed instanceof SerializedThrowable);
-        assertEquals("java.lang.Exception: suppressed", 
actualSuppressed.getMessage());
+        assertThat(actualSuppressed).isInstanceOf(SerializedThrowable.class);
+        assertThat(actualSuppressed).hasMessage("java.lang.Exception: 
suppressed");
     }
 
     @Test
-    public void testCopySuppressed() {
+    void testCopySuppressed() {
         Exception root = new Exception("root");
         Exception suppressed = new Exception("suppressed");
         root.addSuppressed(suppressed);
 
         SerializedThrowable serializedThrowable = new 
SerializedThrowable(root);
         SerializedThrowable copy = new 
SerializedThrowable(serializedThrowable);
 
-        assertEquals(1, copy.getSuppressed().length);
+        assertThat(copy.getSuppressed().length).isOne();

Review Comment:
   ```suggestion
           assertThat(copy.getSuppressed()).hasSize(1);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java:
##########
@@ -132,27 +131,33 @@ public void testGetWeightsFromConfigLegacy() {
                 ManagedMemoryUtils.getManagedMemoryUseCaseWeightsFromConfig(
                         CONFIG_WITH_LEGACY_USE_CASES);
 
-        assertThat(configuredWeights, is(expectedWeights));
+        assertThat(configuredWeights).isEqualTo(expectedWeights);
     }
 
-    @Test(expected = IllegalConfigurationException.class)
-    public void testGetWeightsFromConfigFailNegativeWeight() {
-        final Configuration config =
-                new Configuration() {
-                    {
-                        set(
-                                
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_WEIGHTS,
-                                Collections.singletonMap(
-                                        
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_OPERATOR,
-                                        "-123"));
-                    }
-                };
+    @Test
+    void testGetWeightsFromConfigFailNegativeWeight() {
+        assertThatThrownBy(
+                        () -> {
+                            final Configuration config =
+                                    new Configuration() {
+                                        {
+                                            set(
+                                                    TaskManagerOptions
+                                                            
.MANAGED_MEMORY_CONSUMER_WEIGHTS,
+                                                    Collections.singletonMap(
+                                                            TaskManagerOptions
+                                                                    
.MANAGED_MEMORY_CONSUMER_NAME_OPERATOR,
+                                                            "-123"));
+                                        }
+                                    };

Review Comment:
   The config can be removed from this block, right?
   
   The expected `IllegalConfigurationException` should be thrown from 
`ManagedMemoryUtils.getManagedMemoryUseCaseWeightsFromConfig(config);`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java:
##########
@@ -287,22 +297,27 @@ public void 
testUseCaseWeightsConfiguredWithConsistentValue() {
         ManagedMemoryUtils.validateUseCaseWeightsNotConflict(existingWeights, 
newWeights);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testUseCaseWeightsConfiguredWithConflictValue() {
-        final Map<ManagedMemoryUseCase, Integer> existingWeights =
-                new HashMap<ManagedMemoryUseCase, Integer>() {
-                    {
-                        put(ManagedMemoryUseCase.OPERATOR, 123);
-                    }
-                };
+    @Test
+    void testUseCaseWeightsConfiguredWithConflictValue() {
+        assertThatThrownBy(
+                        () -> {
+                            final Map<ManagedMemoryUseCase, Integer> 
existingWeights =
+                                    new HashMap<ManagedMemoryUseCase, 
Integer>() {
+                                        {
+                                            put(ManagedMemoryUseCase.OPERATOR, 
123);
+                                        }
+                                    };
 
-        final Map<ManagedMemoryUseCase, Integer> newWeights =
-                new HashMap<ManagedMemoryUseCase, Integer>() {
-                    {
-                        put(ManagedMemoryUseCase.OPERATOR, 456);
-                    }
-                };
+                            final Map<ManagedMemoryUseCase, Integer> 
newWeights =
+                                    new HashMap<ManagedMemoryUseCase, 
Integer>() {
+                                        {
+                                            put(ManagedMemoryUseCase.OPERATOR, 
456);
+                                        }
+                                    };
 
-        ManagedMemoryUtils.validateUseCaseWeightsNotConflict(existingWeights, 
newWeights);
+                            
ManagedMemoryUtils.validateUseCaseWeightsNotConflict(
+                                    existingWeights, newWeights);
+                        })
+                .isInstanceOf(IllegalStateException.class);

Review Comment:
   Same to the last comment, the `existingWeights` and `newWeights` should be 
removed from this block.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java:
##########
@@ -63,86 +62,83 @@ protected ProcessMemoryUtilsTestBase(
         this.newOptionForLegacyHeapOption = 
checkNotNull(newOptionForLegacyHeapOption);
     }
 
-    @Before
-    public void setup() {
+    @BeforeEach
+    void setup() {
         oldEnvVariables = System.getenv();
     }
 
-    @After
-    public void teardown() {
+    @AfterEach
+    void teardown() {
         if (oldEnvVariables != null) {
             CommonTestUtils.setEnv(oldEnvVariables, true);
         }
     }
 
     @Test
-    public void testGenerateJvmParameters() {
+    void testGenerateJvmParameters() {
         ProcessMemorySpec spec = JvmArgTestingProcessMemorySpec.generate();
         String jvmParamsStr = 
ProcessMemoryUtils.generateJvmParametersStr(spec, true);
         Map<String, String> configs = 
ConfigurationUtils.parseJvmArgString(jvmParamsStr);
 
-        assertThat(configs.size(), is(4));
-        assertThat(MemorySize.parse(configs.get("-Xmx")), 
is(spec.getJvmHeapMemorySize()));
-        assertThat(MemorySize.parse(configs.get("-Xms")), 
is(spec.getJvmHeapMemorySize()));
-        assertThat(
-                MemorySize.parse(configs.get("-XX:MaxMetaspaceSize=")),
-                is(spec.getJvmMetaspaceSize()));
-        assertThat(
-                MemorySize.parse(configs.get("-XX:MaxDirectMemorySize=")),
-                is(spec.getJvmDirectMemorySize()));
+        assertThat(configs).hasSize(4);
+        
assertThat(MemorySize.parse(configs.get("-Xmx"))).isEqualTo(spec.getJvmHeapMemorySize());
+        
assertThat(MemorySize.parse(configs.get("-Xms"))).isEqualTo(spec.getJvmHeapMemorySize());
+        assertThat(MemorySize.parse(configs.get("-XX:MaxMetaspaceSize=")))
+                .isEqualTo(spec.getJvmMetaspaceSize());
+        assertThat(MemorySize.parse(configs.get("-XX:MaxDirectMemorySize=")))
+                .isEqualTo(spec.getJvmDirectMemorySize());
     }
 
     @Test
-    public void testGenerateJvmParametersWithoutDirectMemoryLimit() {
+    void testGenerateJvmParametersWithoutDirectMemoryLimit() {
         ProcessMemorySpec spec = JvmArgTestingProcessMemorySpec.generate();
         String jvmParamsStr = 
ProcessMemoryUtils.generateJvmParametersStr(spec, false);
         Map<String, String> configs = 
ConfigurationUtils.parseJvmArgString(jvmParamsStr);
 
-        assertThat(configs.size(), is(3));
-        assertThat(MemorySize.parse(configs.get("-Xmx")), 
is(spec.getJvmHeapMemorySize()));
-        assertThat(MemorySize.parse(configs.get("-Xms")), 
is(spec.getJvmHeapMemorySize()));
-        assertThat(
-                MemorySize.parse(configs.get("-XX:MaxMetaspaceSize=")),
-                is(spec.getJvmMetaspaceSize()));
-        assertThat(configs.containsKey("-XX:MaxDirectMemorySize="), is(false));
+        assertThat(configs).hasSize(3);
+        
assertThat(MemorySize.parse(configs.get("-Xmx"))).isEqualTo(spec.getJvmHeapMemorySize());
+        
assertThat(MemorySize.parse(configs.get("-Xms"))).isEqualTo(spec.getJvmHeapMemorySize());
+        assertThat(MemorySize.parse(configs.get("-XX:MaxMetaspaceSize=")))
+                .isEqualTo(spec.getJvmMetaspaceSize());
+        assertThat(configs.containsKey("-XX:MaxDirectMemorySize=")).isFalse();
     }
 
     @Test
-    public void testConfigTotalFlinkMemory() {
+    void testConfigTotalFlinkMemory() {
         MemorySize totalFlinkMemorySize = MemorySize.parse("1g");
 
         Configuration conf = new Configuration();
         conf.set(options.getTotalFlinkMemoryOption(), totalFlinkMemorySize);
 
         T processSpec = processSpecFromConfig(conf);
-        assertThat(processSpec.getTotalFlinkMemorySize(), 
is(totalFlinkMemorySize));
+        
assertThat(processSpec.getTotalFlinkMemorySize()).isEqualTo(totalFlinkMemorySize);
     }
 
     @Test
-    public void testConfigTotalProcessMemorySize() {
+    void testConfigTotalProcessMemorySize() {
         MemorySize totalProcessMemorySize = MemorySize.parse("2g");
 
         Configuration conf = new Configuration();
         conf.set(options.getTotalProcessMemoryOption(), 
totalProcessMemorySize);
 
         T processSpec = processSpecFromConfig(conf);
-        assertThat(processSpec.getTotalProcessMemorySize(), 
is(totalProcessMemorySize));
+        
assertThat(processSpec.getTotalProcessMemorySize()).isEqualTo(totalProcessMemorySize);
     }
 
     @Test
-    public void testExceptionShouldContainRequiredConfigOptions() {
+    void testExceptionShouldContainRequiredConfigOptions() {
         try {
             processSpecFromConfig(new Configuration());
         } catch (IllegalConfigurationException e) {
             options.getRequiredFineGrainedOptions()
-                    .forEach(option -> assertThat(e.getMessage(), 
containsString(option.key())));
-            assertThat(e.getMessage(), 
containsString(options.getTotalFlinkMemoryOption().key()));
-            assertThat(e.getMessage(), 
containsString(options.getTotalProcessMemoryOption().key()));
+                    .forEach(option -> 
assertThat(e).hasMessageContaining(option.key()));
+            
assertThat(e).hasMessageContaining(options.getTotalFlinkMemoryOption().key());
+            
assertThat(e).hasMessageContaining(options.getTotalProcessMemoryOption().key());

Review Comment:
   ```suggestion
               
assertThat(e).hasMessageContaining(options.getTotalProcessMemoryOption().key());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to