This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c884ab4837 [FLINK-32850][flink-runtime][JUnit5 Migration] The 
io.network.netty package of flink-runtime module
1c884ab4837 is described below

commit 1c884ab48372f7a66f86c28aeaf9518000c7f357
Author: Jiabao Sun <jiabao....@xtransfer.cn>
AuthorDate: Fri Dec 15 14:57:18 2023 +0800

    [FLINK-32850][flink-runtime][JUnit5 Migration] The io.network.netty package 
of flink-runtime module
    
    This closes #23607.
---
 .../runtime/io/network/netty/ByteBufUtilsTest.java |  42 +++--
 .../network/netty/CancelPartitionRequestTest.java  |  38 ++---
 .../netty/ClientTransportErrorHandlingTest.java    |   8 +-
 ...editBasedPartitionRequestClientHandlerTest.java |  10 +-
 ...CreditBasedSequenceNumberingViewReaderTest.java |  32 ++--
 .../io/network/netty/NettyBufferPoolTest.java      |  51 +++---
 .../io/network/netty/NettyClientServerSslTest.java | 123 +++++++-------
 .../runtime/io/network/netty/NettyClientTest.java  |   2 +-
 .../network/netty/NettyConnectionManagerTest.java  |  38 ++---
 .../NettyMessageClientDecoderDelegateTest.java     |  39 +++--
 .../NettyMessageClientSideSerializationTest.java   |   3 -
 .../NettyMessageServerSideSerializationTest.java   |  65 ++++----
 .../netty/NettyPartitionRequestClientTest.java     | 120 +++++++-------
 .../netty/NettyServerFromPortRangeTest.java        |   2 +-
 .../runtime/io/network/netty/NettyTestUtil.java    |  21 ++-
 .../netty/PartitionRequestClientFactoryTest.java   |  57 +++----
 .../network/netty/PartitionRequestQueueTest.java   | 177 ++++++++++-----------
 .../netty/PartitionRequestServerHandlerTest.java   |  30 ++--
 .../netty/ServerTransportErrorHandlingTest.java    |  20 +--
 19 files changed, 420 insertions(+), 458 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java
index 4fcf083f3db..146057025ef 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java
@@ -18,24 +18,20 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
-import org.apache.flink.util.TestLogger;
-
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests the methods in {@link ByteBufUtils}. */
-public class ByteBufUtilsTest extends TestLogger {
+class ByteBufUtilsTest {
     private static final byte ACCUMULATION_BYTE = 0x7d;
     private static final byte NON_ACCUMULATION_BYTE = 0x23;
 
     @Test
-    public void testAccumulateWithoutCopy() {
+    void testAccumulateWithoutCopy() {
         int sourceLength = 128;
         int sourceReaderIndex = 32;
         int expectedAccumulationSize = 16;
@@ -49,13 +45,13 @@ public class ByteBufUtilsTest extends TestLogger {
                 ByteBufUtils.accumulate(
                         target, src, expectedAccumulationSize, 
target.readableBytes());
 
-        assertSame(src, accumulated);
-        assertEquals(sourceReaderIndex, src.readerIndex());
+        assertThat(accumulated).isSameAs(src);
+        assertThat(src.readerIndex()).isEqualTo(sourceReaderIndex);
         verifyBufferContent(src, sourceReaderIndex, expectedAccumulationSize);
     }
 
     @Test
-    public void testAccumulateWithCopy() {
+    void testAccumulateWithCopy() {
         int sourceLength = 128;
         int firstSourceReaderIndex = 32;
         int secondSourceReaderIndex = 0;
@@ -76,9 +72,9 @@ public class ByteBufUtilsTest extends TestLogger {
         ByteBuf accumulated =
                 ByteBufUtils.accumulate(
                         target, firstSource, expectedAccumulationSize, 
target.readableBytes());
-        assertNull(accumulated);
-        assertEquals(sourceLength, firstSource.readerIndex());
-        assertEquals(firstAccumulationSize, target.readableBytes());
+        assertThat(accumulated).isNull();
+        assertThat(firstSource.readerIndex()).isEqualTo(sourceLength);
+        assertThat(target.readableBytes()).isEqualTo(firstAccumulationSize);
 
         // The remaining data will be copied from the second buffer, and the 
target buffer will be
         // returned
@@ -86,9 +82,10 @@ public class ByteBufUtilsTest extends TestLogger {
         accumulated =
                 ByteBufUtils.accumulate(
                         target, secondSource, expectedAccumulationSize, 
target.readableBytes());
-        assertSame(target, accumulated);
-        assertEquals(secondSourceReaderIndex + secondAccumulationSize, 
secondSource.readerIndex());
-        assertEquals(expectedAccumulationSize, target.readableBytes());
+        assertThat(accumulated).isSameAs(target);
+        assertThat(secondSource.readerIndex())
+                .isEqualTo(secondSourceReaderIndex + secondAccumulationSize);
+        assertThat(target.readableBytes()).isEqualTo(expectedAccumulationSize);
 
         verifyBufferContent(accumulated, 0, expectedAccumulationSize);
     }
@@ -103,7 +100,7 @@ public class ByteBufUtilsTest extends TestLogger {
      * @param accumulationSize The size of bytes that will be read for 
accumulating.
      * @return The required source buffer.
      */
-    private ByteBuf createSourceBuffer(int size, int readerIndex, int 
accumulationSize) {
+    private static ByteBuf createSourceBuffer(int size, int readerIndex, int 
accumulationSize) {
         ByteBuf buf = Unpooled.buffer(size);
 
         for (int i = 0; i < readerIndex; i++) {
@@ -122,13 +119,12 @@ public class ByteBufUtilsTest extends TestLogger {
         return buf;
     }
 
-    private void verifyBufferContent(ByteBuf buf, int start, int length) {
+    private static void verifyBufferContent(ByteBuf buf, int start, int 
length) {
         for (int i = 0; i < length; ++i) {
             byte b = buf.getByte(start + i);
-            assertEquals(
-                    String.format("The byte at position %d is not right.", 
start + i),
-                    ACCUMULATION_BYTE,
-                    b);
+            assertThat(b)
+                    .withFailMessage("The byte at position %d is not right.", 
start + i)
+                    .isEqualTo(ACCUMULATION_BYTE);
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 7bfa0d28f85..a07e6d2aedc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.testutils.TestingUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 import org.mockito.stubbing.Answer;
 
 import javax.annotation.Nullable;
@@ -50,16 +50,16 @@ import static 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
 import static 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-public class CancelPartitionRequestTest {
+class CancelPartitionRequestTest {
 
     /**
      * Verifies that requests for non-existing (failed/cancelled) input 
channels are properly
@@ -67,7 +67,7 @@ public class CancelPartitionRequestTest {
      * This should cancel the request.
      */
     @Test
-    public void testCancelPartitionRequest() throws Exception {
+    void testCancelPartitionRequest() throws Exception {
 
         NettyServerAndClient serverAndClient = null;
 
@@ -104,12 +104,12 @@ public class CancelPartitionRequestTest {
                     .await();
 
             // Wait for the notification
-            if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), 
TimeUnit.MILLISECONDS)) {
-                fail(
-                        "Timed out after waiting for "
-                                + TestingUtils.TESTING_DURATION.toMillis()
-                                + " ms to be notified about cancelled 
partition.");
-            }
+            assertThat(sync.await(TestingUtils.TESTING_DURATION.toMillis(), 
TimeUnit.MILLISECONDS))
+                    .withFailMessage(
+                            "Timed out after waiting for "
+                                    + TestingUtils.TESTING_DURATION.toMillis()
+                                    + " ms to be notified about cancelled 
partition.")
+                    .isTrue();
 
             verify(view, times(1)).releaseAllResources();
         } finally {
@@ -118,7 +118,7 @@ public class CancelPartitionRequestTest {
     }
 
     @Test
-    public void testDuplicateCancel() throws Exception {
+    void testDuplicateCancel() throws Exception {
 
         NettyServerAndClient serverAndClient = null;
 
@@ -163,12 +163,12 @@ public class CancelPartitionRequestTest {
                     .await();
 
             // Wait for the notification
-            if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), 
TimeUnit.MILLISECONDS)) {
-                fail(
-                        "Timed out after waiting for "
-                                + TestingUtils.TESTING_DURATION.toMillis()
-                                + " ms to be notified about cancelled 
partition.");
-            }
+            assertThat(sync.await(TestingUtils.TESTING_DURATION.toMillis(), 
TimeUnit.MILLISECONDS))
+                    .withFailMessage(
+                            "Timed out after waiting for "
+                                    + TestingUtils.TESTING_DURATION.toMillis()
+                                    + " ms to be notified about cancelled 
partition.")
+                    .isTrue();
 
             ch.writeAndFlush(new 
CancelPartitionRequest(inputChannelId)).await();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
index 47d312f5ffa..978065c48a0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
@@ -370,7 +370,7 @@ class ClientTransportErrorHandlingTest {
     // Helpers
     // 
---------------------------------------------------------------------------------------------
 
-    private EmbeddedChannel createEmbeddedChannel() {
+    private static EmbeddedChannel createEmbeddedChannel() {
         NettyProtocol protocol =
                 new NettyProtocol(
                         mock(ResultPartitionProvider.class), 
mock(TaskEventDispatcher.class));
@@ -378,7 +378,7 @@ class ClientTransportErrorHandlingTest {
         return new EmbeddedChannel(protocol.getClientChannelHandlers());
     }
 
-    private RemoteInputChannel addInputChannel(NetworkClientHandler 
clientHandler)
+    private static RemoteInputChannel addInputChannel(NetworkClientHandler 
clientHandler)
             throws IOException {
         RemoteInputChannel rich = createRemoteInputChannel();
         clientHandler.addInputChannel(rich);
@@ -386,13 +386,13 @@ class ClientTransportErrorHandlingTest {
         return rich;
     }
 
-    private NetworkClientHandler getClientHandler(Channel ch) {
+    private static NetworkClientHandler getClientHandler(Channel ch) {
         NetworkClientHandler networkClientHandler = 
ch.pipeline().get(NetworkClientHandler.class);
         networkClientHandler.setConnectionId(CONNECTION_ID);
         return networkClientHandler;
     }
 
-    private RemoteInputChannel createRemoteInputChannel() {
+    private static RemoteInputChannel createRemoteInputChannel() {
         return when(mock(RemoteInputChannel.class).getInputChannelId())
                 .thenReturn(new InputChannelID())
                 .getMock();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index f46c25ff5a2..0048cde0b13 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -71,7 +71,7 @@ import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtil
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assumptions.assumeThat;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -191,7 +191,7 @@ class CreditBasedPartitionRequestClientHandlerTest {
                             new NetworkBufferAllocator(handler));
             handler.channelRead(mock(ChannelHandlerContext.class), 
bufferResponse);
 
-            assertThat(inputChannel.getNumberOfQueuedBuffers()).isEqualTo(1);
+            assertThat(inputChannel.getNumberOfQueuedBuffers()).isOne();
             assertThat(inputChannel.getSenderBacklog()).isEqualTo(2);
         } finally {
             releaseResource(inputGate, networkBufferPool);
@@ -307,7 +307,7 @@ class CreditBasedPartitionRequestClientHandlerTest {
 
         assertThat(inputChannel.getNumberOfAvailableBuffers())
                 .as("There should be no buffers available in the channel.")
-                .isEqualTo(0);
+                .isZero();
 
         final BufferResponse bufferResponse =
                 createBufferResponse(
@@ -471,7 +471,7 @@ class CreditBasedPartitionRequestClientHandlerTest {
                             allocator);
             handler.channelRead(mock(ChannelHandlerContext.class), 
bufferResponse3);
 
-            assertThat(inputChannels[0].getUnannouncedCredit()).isEqualTo(1);
+            assertThat(inputChannels[0].getUnannouncedCredit()).isOne();
             assertThat(inputChannels[1].getUnannouncedCredit()).isZero();
 
             channel.runPendingTasks();
@@ -488,7 +488,7 @@ class CreditBasedPartitionRequestClientHandlerTest {
             assertThat(channel.isWritable()).isTrue();
             readFromOutbound = channel.readOutbound();
             assertThat(readFromOutbound).isInstanceOf(AddCredit.class);
-            assertThat(((AddCredit) readFromOutbound).credit).isEqualTo(1);
+            assertThat(((AddCredit) readFromOutbound).credit).isOne();
             assertThat(inputChannels[0].getUnannouncedCredit()).isZero();
             assertThat(inputChannels[1].getUnannouncedCredit()).isZero();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReaderTest.java
index e8a6c707044..30508cba028 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReaderTest.java
@@ -24,58 +24,56 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 
 import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link CreditBasedSequenceNumberingViewReader}. */
-public class CreditBasedSequenceNumberingViewReaderTest {
+class CreditBasedSequenceNumberingViewReaderTest {
 
     @Test
-    public void testResumeConsumption() throws Exception {
+    void testResumeConsumption() throws Exception {
         int numCredits = 2;
         CreditBasedSequenceNumberingViewReader reader1 =
                 createNetworkSequenceViewReader(numCredits);
 
         reader1.resumeConsumption();
-        assertEquals(numCredits, reader1.getNumCreditsAvailable());
+        assertThat(reader1.getNumCreditsAvailable()).isEqualTo(numCredits);
 
         reader1.addCredit(numCredits);
         reader1.resumeConsumption();
-        assertEquals(2 * numCredits, reader1.getNumCreditsAvailable());
+        assertThat(reader1.getNumCreditsAvailable()).isEqualTo(2 * numCredits);
 
         CreditBasedSequenceNumberingViewReader reader2 = 
createNetworkSequenceViewReader(0);
 
         reader2.addCredit(numCredits);
-        assertEquals(numCredits, reader2.getNumCreditsAvailable());
+        assertThat(reader2.getNumCreditsAvailable()).isEqualTo(numCredits);
 
         reader2.resumeConsumption();
-        assertEquals(0, reader2.getNumCreditsAvailable());
+        assertThat(reader2.getNumCreditsAvailable()).isZero();
     }
 
     @Test
-    public void testNeedAnnounceBacklog() throws Exception {
+    void testNeedAnnounceBacklog() throws Exception {
         int numCredits = 2;
         CreditBasedSequenceNumberingViewReader reader1 =
                 createNetworkSequenceViewReader(numCredits);
 
-        assertFalse(reader1.needAnnounceBacklog());
+        assertThat(reader1.needAnnounceBacklog()).isFalse();
         reader1.addCredit(-numCredits);
-        assertFalse(reader1.needAnnounceBacklog());
+        assertThat(reader1.needAnnounceBacklog()).isFalse();
 
         CreditBasedSequenceNumberingViewReader reader2 = 
createNetworkSequenceViewReader(0);
-        assertTrue(reader2.needAnnounceBacklog());
+        assertThat(reader2.needAnnounceBacklog()).isTrue();
 
         reader2.addCredit(numCredits);
-        assertFalse(reader2.needAnnounceBacklog());
+        assertThat(reader2.needAnnounceBacklog()).isFalse();
 
         reader2.addCredit(-numCredits);
-        assertTrue(reader2.needAnnounceBacklog());
+        assertThat(reader2.needAnnounceBacklog()).isTrue();
     }
 
-    private CreditBasedSequenceNumberingViewReader 
createNetworkSequenceViewReader(
+    private static CreditBasedSequenceNumberingViewReader 
createNetworkSequenceViewReader(
             int initialCredit) throws Exception {
         PartitionRequestQueue queue = new PartitionRequestQueue();
         EmbeddedChannel channel = new EmbeddedChannel(queue);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
index 0aaa11ab64c..c66d8a33390 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
@@ -20,22 +20,21 @@ package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 
-import org.junit.After;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link NettyBufferPool} wrapper. */
-public class NettyBufferPoolTest {
+class NettyBufferPoolTest {
 
     private final List<ByteBuf> needReleasing = new ArrayList<>();
 
-    @After
-    public void tearDown() {
+    @AfterEach
+    void tearDown() {
         try {
             // Release all of the buffers.
             for (ByteBuf buf : needReleasing) {
@@ -44,7 +43,7 @@ public class NettyBufferPoolTest {
 
             // Checks in a separate loop in case we have sliced buffers.
             for (ByteBuf buf : needReleasing) {
-                assertEquals(0, buf.refCnt());
+                assertThat(buf.refCnt()).isZero();
             }
         } finally {
             needReleasing.clear();
@@ -52,49 +51,49 @@ public class NettyBufferPoolTest {
     }
 
     @Test
-    public void testNoHeapAllocations() throws Exception {
+    void testNoHeapAllocations() {
         final NettyBufferPool nettyBufferPool = new NettyBufferPool(1);
 
         // Buffers should prefer to be direct
-        assertTrue(releaseLater(nettyBufferPool.buffer()).isDirect());
-        assertTrue(releaseLater(nettyBufferPool.buffer(128)).isDirect());
-        assertTrue(releaseLater(nettyBufferPool.buffer(128, 256)).isDirect());
+        assertThat(releaseLater(nettyBufferPool.buffer()).isDirect()).isTrue();
+        
assertThat(releaseLater(nettyBufferPool.buffer(128)).isDirect()).isTrue();
+        assertThat(releaseLater(nettyBufferPool.buffer(128, 
256)).isDirect()).isTrue();
 
         // IO buffers should prefer to be direct
-        assertTrue(releaseLater(nettyBufferPool.ioBuffer()).isDirect());
-        assertTrue(releaseLater(nettyBufferPool.ioBuffer(128)).isDirect());
-        assertTrue(releaseLater(nettyBufferPool.ioBuffer(128, 
256)).isDirect());
+        
assertThat(releaseLater(nettyBufferPool.ioBuffer()).isDirect()).isTrue();
+        
assertThat(releaseLater(nettyBufferPool.ioBuffer(128)).isDirect()).isTrue();
+        assertThat(releaseLater(nettyBufferPool.ioBuffer(128, 
256)).isDirect()).isTrue();
 
         // Currently we fakes the heap buffer allocation with direct buffers
-        assertTrue(releaseLater(nettyBufferPool.heapBuffer()).isDirect());
-        assertTrue(releaseLater(nettyBufferPool.heapBuffer(128)).isDirect());
-        assertTrue(releaseLater(nettyBufferPool.heapBuffer(128, 
256)).isDirect());
+        
assertThat(releaseLater(nettyBufferPool.heapBuffer()).isDirect()).isTrue();
+        
assertThat(releaseLater(nettyBufferPool.heapBuffer(128)).isDirect()).isTrue();
+        assertThat(releaseLater(nettyBufferPool.heapBuffer(128, 
256)).isDirect()).isTrue();
 
         // Composite buffers allocates the corresponding type of buffers when 
extending its capacity
-        
assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer()).capacity(1024).isDirect());
-        
assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer(10)).capacity(1024).isDirect());
+        
assertThat(releaseLater(nettyBufferPool.compositeHeapBuffer()).capacity(1024).isDirect())
+                .isTrue();
+        
assertThat(releaseLater(nettyBufferPool.compositeHeapBuffer(10)).capacity(1024).isDirect())
+                .isTrue();
 
         // Is direct buffer pooled!
-        assertTrue(nettyBufferPool.isDirectBufferPooled());
+        assertThat(nettyBufferPool.isDirectBufferPooled()).isTrue();
     }
 
     @Test
-    public void testAllocationsStatistics() throws Exception {
+    void testAllocationsStatistics() throws Exception {
         NettyBufferPool nettyBufferPool = new NettyBufferPool(1);
         int chunkSize = nettyBufferPool.getChunkSize();
 
         {
             // Single large buffer allocates one chunk
             releaseLater(nettyBufferPool.directBuffer(chunkSize - 64));
-            long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get();
-            assertEquals(chunkSize, allocated);
+            
assertThat(nettyBufferPool.getNumberOfAllocatedBytes()).hasValue((long) 
chunkSize);
         }
 
         {
             // Allocate a little more (one more chunk required)
             releaseLater(nettyBufferPool.directBuffer(128));
-            long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get();
-            assertEquals(2 * chunkSize, allocated);
+            
assertThat(nettyBufferPool.getNumberOfAllocatedBytes()).hasValue(2L * 
chunkSize);
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
index 58ba3e4785a..d0b0104d168 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
@@ -24,8 +24,10 @@ import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
 import org.apache.flink.runtime.net.SSLUtilsTest;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
@@ -33,13 +35,12 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringDecode
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringEncoder;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import javax.net.ssl.SSLSessionContext;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.time.Duration;
 import java.util.List;
@@ -48,31 +49,29 @@ import static 
org.apache.flink.configuration.SecurityOptions.SSL_INTERNAL_CLOSE_
 import static 
org.apache.flink.configuration.SecurityOptions.SSL_INTERNAL_HANDSHAKE_TIMEOUT;
 import static 
org.apache.flink.configuration.SecurityOptions.SSL_INTERNAL_SESSION_CACHE_SIZE;
 import static 
org.apache.flink.configuration.SecurityOptions.SSL_INTERNAL_SESSION_TIMEOUT;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the SSL connection between Netty Server and Client used for the 
data plane. */
-@RunWith(Parameterized.class)
-public class NettyClientServerSslTest extends TestLogger {
+@ExtendWith(ParameterizedTestExtension.class)
+class NettyClientServerSslTest {
 
-    @Parameterized.Parameter public String sslProvider;
+    @Parameter private String sslProvider;
 
-    @Parameterized.Parameters(name = "SSL provider = {0}")
+    @Parameters(name = "SSL provider = {0}")
     public static List<String> parameters() {
         return SSLUtilsTest.AVAILABLE_SSL_PROVIDERS;
     }
 
     /** Verify valid ssl configuration and connection. */
-    @Test
-    public void testValidSslConnection() throws Exception {
+    @TestTemplate
+    void testValidSslConnection() throws Exception {
         testValidSslConnection(createSslConfig());
     }
 
     /** Verify valid (advanced) ssl configuration and connection. */
-    @Test
-    public void testValidSslConnectionAdvanced() throws Exception {
+    @TestTemplate
+    void testValidSslConnectionAdvanced() throws Exception {
         Configuration sslConfig = createSslConfig();
         sslConfig.setInteger(SSL_INTERNAL_SESSION_CACHE_SIZE, 1);
 
@@ -110,8 +109,9 @@ public class NettyClientServerSslTest extends TestLogger {
             final NettyClient client = NettyTestUtil.initClient(nettyConfig, 
protocol, bufferPool);
             serverAndClient = new NettyServerAndClient(server, client);
         }
-        Assert.assertNotNull(
-                "serverAndClient is null due to fail to get a free port", 
serverAndClient);
+        assertThat(serverAndClient)
+                .withFailMessage("serverAndClient is null due to fail to get a 
free port")
+                .isNotNull();
 
         Channel ch = NettyTestUtil.connect(serverAndClient);
 
@@ -132,7 +132,7 @@ public class NettyClientServerSslTest extends TestLogger {
         // session context is only be available after a session was setup -> 
this should be true
         // after data was sent
         serverChannelInitComplete.await();
-        assertNotNull(serverSslHandler[0]);
+        assertThat(serverSslHandler[0]).isNotNull();
 
         // verify server parameters
         assertEqualsOrDefault(
@@ -145,7 +145,9 @@ public class NettyClientServerSslTest extends TestLogger {
                 serverSslHandler[0].getCloseNotifyFlushTimeoutMillis());
         SSLSessionContext sessionContext =
                 serverSslHandler[0].engine().getSession().getSessionContext();
-        assertNotNull("bug in unit test setup: session context not available", 
sessionContext);
+        assertThat(sessionContext)
+                .withFailMessage("bug in unit test setup: session context not 
available")
+                .isNotNull();
         // note: can't verify session cache setting at the client - delegate 
to server instead (with
         // our own channel initializer)
         assertEqualsOrDefault(
@@ -153,11 +155,11 @@ public class NettyClientServerSslTest extends TestLogger {
         int sessionTimeout = 
sslConfig.getInteger(SSL_INTERNAL_SESSION_TIMEOUT);
         if (sessionTimeout != -1) {
             // session timeout config is in milliseconds but the context 
returns it in seconds
-            assertEquals(sessionTimeout / 1000, 
sessionContext.getSessionTimeout());
+            
assertThat(sessionContext.getSessionTimeout()).isEqualTo(sessionTimeout / 1000);
         } else {
-            assertTrue(
-                    "default value (-1) should not be propagated",
-                    sessionContext.getSessionTimeout() >= 0);
+            assertThat(sessionContext.getSessionTimeout())
+                    .withFailMessage("default value (-1) should not be 
propagated")
+                    .isGreaterThanOrEqualTo(0);
         }
 
         NettyTestUtil.shutdown(serverAndClient);
@@ -167,16 +169,17 @@ public class NettyClientServerSslTest extends TestLogger {
             Configuration sslConfig, ConfigOption<Integer> option, long 
actual) {
         long expected = sslConfig.getInteger(option);
         if (expected != option.defaultValue()) {
-            assertEquals(expected, actual);
+            assertThat(actual).isEqualTo(expected);
         } else {
-            assertTrue(
-                    "default value (" + option.defaultValue() + ") should not 
be propagated",
-                    actual >= 0);
+            assertThat(actual)
+                    .withFailMessage(
+                            "default value (%d) should not be propagated", 
option.defaultValue())
+                    .isGreaterThanOrEqualTo(0);
         }
     }
 
     /** Verify failure on invalid ssl configuration. */
-    @Test
+    @TestTemplate
     public void testInvalidSslConfiguration() throws Exception {
         NettyProtocol protocol = new NettyTestUtil.NoOpProtocol();
 
@@ -184,22 +187,18 @@ public class NettyClientServerSslTest extends TestLogger {
         // Modify the keystore password to an incorrect one
         config.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, 
"invalidpassword");
 
-        NettyTestUtil.NettyServerAndClient serverAndClient = null;
         try (NetUtils.Port port = NetUtils.getAvailablePort()) {
             NettyConfig nettyConfig = createNettyConfig(config, port);
 
-            serverAndClient = NettyTestUtil.initServerAndClient(protocol, 
nettyConfig);
-            Assert.fail("Created server and client from invalid 
configuration");
-        } catch (Exception e) {
-            // Exception should be thrown as expected
+            assertThatThrownBy(() -> 
NettyTestUtil.initServerAndClient(protocol, nettyConfig))
+                    .withFailMessage("Created server and client from invalid 
configuration")
+                    .isInstanceOf(IOException.class);
         }
-
-        NettyTestUtil.shutdown(serverAndClient);
     }
 
     /** Verify SSL handshake error when untrusted server certificate is used. 
*/
-    @Test
-    public void testSslHandshakeError() throws Exception {
+    @TestTemplate
+    void testSslHandshakeError() throws Exception {
         NettyProtocol protocol = new NettyTestUtil.NoOpProtocol();
 
         Configuration config = createSslConfig();
@@ -214,19 +213,20 @@ public class NettyClientServerSslTest extends TestLogger {
 
             serverAndClient = NettyTestUtil.initServerAndClient(protocol, 
nettyConfig);
         }
-        Assert.assertNotNull(
-                "serverAndClient is null due to fail to get a free port", 
serverAndClient);
+        assertThat(serverAndClient)
+                .withFailMessage("serverAndClient is null due to fail to get a 
free port")
+                .isNotNull();
         Channel ch = NettyTestUtil.connect(serverAndClient);
         ch.pipeline().addLast(new StringDecoder()).addLast(new 
StringEncoder());
 
         // Attempting to write data over ssl should fail
-        assertFalse(ch.writeAndFlush("test").await().isSuccess());
+        assertThat(ch.writeAndFlush("test").await().isSuccess()).isFalse();
 
         NettyTestUtil.shutdown(serverAndClient);
     }
 
-    @Test
-    public void testClientUntrustedCertificate() throws Exception {
+    @TestTemplate
+    void testClientUntrustedCertificate() throws Exception {
         final Configuration serverConfig = createSslConfig();
         final Configuration clientConfig = createSslConfig();
 
@@ -249,20 +249,21 @@ public class NettyClientServerSslTest extends TestLogger {
                     NettyTestUtil.initClient(nettyClientConfig, protocol, 
bufferPool);
             serverAndClient = new NettyServerAndClient(server, client);
         }
-        Assert.assertNotNull(
-                "serverAndClient is null due to fail to get a free port", 
serverAndClient);
+        assertThat(serverAndClient)
+                .withFailMessage("serverAndClient is null due to fail to get a 
free port")
+                .isNotNull();
 
         final Channel ch = NettyTestUtil.connect(serverAndClient);
         ch.pipeline().addLast(new StringDecoder()).addLast(new 
StringEncoder());
 
         // Attempting to write data over ssl should fail
-        assertFalse(ch.writeAndFlush("test").await().isSuccess());
+        assertThat(ch.writeAndFlush("test").await().isSuccess()).isFalse();
 
         NettyTestUtil.shutdown(serverAndClient);
     }
 
-    @Test
-    public void testSslPinningForValidFingerprint() throws Exception {
+    @TestTemplate
+    void testSslPinningForValidFingerprint() throws Exception {
         NettyProtocol protocol = new NettyTestUtil.NoOpProtocol();
 
         Configuration config = createSslConfig();
@@ -277,19 +278,20 @@ public class NettyClientServerSslTest extends TestLogger {
 
             serverAndClient = NettyTestUtil.initServerAndClient(protocol, 
nettyConfig);
         }
-        Assert.assertNotNull(
-                "serverAndClient is null due to fail to get a free port", 
serverAndClient);
+        assertThat(serverAndClient)
+                .withFailMessage("serverAndClient is null due to fail to get a 
free port")
+                .isNotNull();
 
         Channel ch = NettyTestUtil.connect(serverAndClient);
         ch.pipeline().addLast(new StringDecoder()).addLast(new 
StringEncoder());
 
-        assertTrue(ch.writeAndFlush("test").await().isSuccess());
+        assertThat(ch.writeAndFlush("test").await().isSuccess()).isTrue();
 
         NettyTestUtil.shutdown(serverAndClient);
     }
 
-    @Test
-    public void testSslPinningForInvalidFingerprint() throws Exception {
+    @TestTemplate
+    void testSslPinningForInvalidFingerprint() throws Exception {
         NettyProtocol protocol = new NettyTestUtil.NoOpProtocol();
 
         Configuration config = createSslConfig();
@@ -305,13 +307,14 @@ public class NettyClientServerSslTest extends TestLogger {
 
             serverAndClient = NettyTestUtil.initServerAndClient(protocol, 
nettyConfig);
         }
-        Assert.assertNotNull(
-                "serverAndClient is null due to fail to get a free port", 
serverAndClient);
+        assertThat(serverAndClient)
+                .withFailMessage("serverAndClient is null due to fail to get a 
free port")
+                .isNotNull();
 
         Channel ch = NettyTestUtil.connect(serverAndClient);
         ch.pipeline().addLast(new StringDecoder()).addLast(new 
StringEncoder());
 
-        assertFalse(ch.writeAndFlush("test").await().isSuccess());
+        assertThat(ch.writeAndFlush("test").await().isSuccess()).isFalse();
 
         NettyTestUtil.shutdown(serverAndClient);
     }
@@ -320,8 +323,8 @@ public class NettyClientServerSslTest extends TestLogger {
         return 
SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores(sslProvider);
     }
 
-    private static NettyConfig createNettyConfig(Configuration config, 
NetUtils.Port availablePort)
-            throws Exception {
+    private static NettyConfig createNettyConfig(
+            Configuration config, NetUtils.Port availablePort) {
         return new NettyConfig(
                 InetAddress.getLoopbackAddress(),
                 availablePort.getPort(),
@@ -354,7 +357,7 @@ public class NettyClientServerSslTest extends TestLogger {
             super.initChannel(channel);
 
             SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl");
-            assertNotNull(sslHandler);
+            assertThat(sslHandler).isNotNull();
             serverHandler[0] = sslHandler;
 
             latch.trigger();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientTest.java
index 6e29197ede7..dd3d0cff568 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientTest.java
@@ -37,7 +37,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assumptions.assumeThat;
 
 /** Tests for {@link NettyClient}. */
-public class NettyClientTest {
+class NettyClientTest {
     @Test
     void testSetKeepaliveOptionWithNioConfigurable() throws Exception {
         assumeThat(keepaliveForNioConfigurable()).isTrue();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
index f93381db59d..85015690d69 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -27,23 +27,22 @@ import 
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
 import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.lang.reflect.Field;
 import java.net.InetAddress;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Simple netty connection manager test. */
-public class NettyConnectionManagerTest {
+class NettyConnectionManagerTest {
 
     /**
      * Tests that the number of arenas and number of threads of the client and 
server are set to the
      * same number, that is the number of configured task slots.
      */
     @Test
-    public void testMatchingNumberOfArenasAndThreadsAsDefault() throws 
Exception {
+    void testMatchingNumberOfArenasAndThreadsAsDefault() throws Exception {
         // Expected number of arenas and threads
         int numberOfSlots = 2;
         NettyConnectionManager connectionManager;
@@ -59,10 +58,11 @@ public class NettyConnectionManagerTest {
             connectionManager = createNettyConnectionManager(config);
             connectionManager.start();
         }
-        assertNotNull(
-                "connectionManager is null due to fail to get a free port", 
connectionManager);
+        assertThat(connectionManager)
+                .withFailMessage("connectionManager is null due to fail to get 
a free port")
+                .isNotNull();
 
-        assertEquals(numberOfSlots, 
connectionManager.getBufferPool().getNumberOfArenas());
+        
assertThat(connectionManager.getBufferPool().getNumberOfArenas()).isEqualTo(numberOfSlots);
 
         {
             // Client event loop group
@@ -73,7 +73,7 @@ public class NettyConnectionManagerTest {
             f.setAccessible(true);
             Object[] eventExecutors = (Object[]) f.get(group);
 
-            assertEquals(numberOfSlots, eventExecutors.length);
+            assertThat(eventExecutors).hasSize(numberOfSlots);
         }
 
         {
@@ -85,7 +85,7 @@ public class NettyConnectionManagerTest {
             f.setAccessible(true);
             Object[] eventExecutors = (Object[]) f.get(group);
 
-            assertEquals(numberOfSlots, eventExecutors.length);
+            assertThat(eventExecutors).hasSize(numberOfSlots);
         }
 
         {
@@ -97,13 +97,13 @@ public class NettyConnectionManagerTest {
             f.setAccessible(true);
             Object[] eventExecutors = (Object[]) f.get(group);
 
-            assertEquals(numberOfSlots, eventExecutors.length);
+            assertThat(eventExecutors).hasSize(numberOfSlots);
         }
     }
 
     /** Tests that the number of arenas and threads can be configured 
manually. */
     @Test
-    public void testManualConfiguration() throws Exception {
+    void testManualConfiguration() throws Exception {
         // Expected numbers
         int numberOfArenas = 1;
         int numberOfClientThreads = 3;
@@ -123,10 +123,12 @@ public class NettyConnectionManagerTest {
             connectionManager = createNettyConnectionManager(config);
             connectionManager.start();
 
-            assertEquals(numberOfArenas, 
connectionManager.getBufferPool().getNumberOfArenas());
+            assertThat(connectionManager.getBufferPool().getNumberOfArenas())
+                    .isEqualTo(numberOfArenas);
         }
-        assertNotNull(
-                "connectionManager is null due to fail to get a free port", 
connectionManager);
+        assertThat(connectionManager)
+                .withFailMessage("connectionManager is null due to fail to get 
a free port")
+                .isNotNull();
 
         {
             // Client event loop group
@@ -137,7 +139,7 @@ public class NettyConnectionManagerTest {
             f.setAccessible(true);
             Object[] eventExecutors = (Object[]) f.get(group);
 
-            assertEquals(numberOfClientThreads, eventExecutors.length);
+            assertThat(eventExecutors).hasSize(numberOfClientThreads);
         }
 
         {
@@ -149,7 +151,7 @@ public class NettyConnectionManagerTest {
             f.setAccessible(true);
             Object[] eventExecutors = (Object[]) f.get(group);
 
-            assertEquals(numberOfServerThreads, eventExecutors.length);
+            assertThat(eventExecutors).hasSize(numberOfServerThreads);
         }
 
         {
@@ -161,7 +163,7 @@ public class NettyConnectionManagerTest {
             f.setAccessible(true);
             Object[] eventExecutors = (Object[]) f.get(group);
 
-            assertEquals(numberOfServerThreads, eventExecutors.length);
+            assertThat(eventExecutors).hasSize(numberOfServerThreads);
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java
index 801e478b340..0b008d6300f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java
@@ -30,14 +30,13 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nullable;
 
@@ -45,16 +44,14 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import static junit.framework.TestCase.assertEquals;
-import static junit.framework.TestCase.assertTrue;
 import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
 import static 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyBufferResponseHeader;
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel;
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
-import static org.junit.Assert.assertNull;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests the client side message decoder. */
-public class NettyMessageClientDecoderDelegateTest extends TestLogger {
+class NettyMessageClientDecoderDelegateTest {
 
     private static final int BUFFER_SIZE = 1024;
 
@@ -72,8 +69,8 @@ public class NettyMessageClientDecoderDelegateTest extends 
TestLogger {
 
     private InputChannelID releasedInputChannelId;
 
-    @Before
-    public void setup() throws IOException, InterruptedException {
+    @BeforeEach
+    void setup() throws IOException, InterruptedException {
         CreditBasedPartitionRequestClientHandler handler =
                 new CreditBasedPartitionRequestClientHandler();
         networkBufferPool = new NetworkBufferPool(NUMBER_OF_BUFFER_RESPONSES, 
BUFFER_SIZE);
@@ -97,8 +94,8 @@ public class NettyMessageClientDecoderDelegateTest extends 
TestLogger {
         releasedInputChannelId = releasedInputChannel.getInputChannelId();
     }
 
-    @After
-    public void tearDown() throws IOException {
+    @AfterEach
+    void tearDown() throws IOException {
         if (inputGate != null) {
             inputGate.close();
         }
@@ -115,7 +112,7 @@ public class NettyMessageClientDecoderDelegateTest extends 
TestLogger {
 
     /** Verifies that the client side decoder works well for unreleased input 
channels. */
     @Test
-    public void testClientMessageDecode() throws Exception {
+    void testClientMessageDecode() throws Exception {
         testNettyMessageClientDecoding(false, false, false);
     }
 
@@ -124,7 +121,7 @@ public class NettyMessageClientDecoderDelegateTest extends 
TestLogger {
      * consume data buffers of the input channels.
      */
     @Test
-    public void testClientMessageDecodeWithEmptyBuffers() throws Exception {
+    void testClientMessageDecodeWithEmptyBuffers() throws Exception {
         testNettyMessageClientDecoding(true, false, false);
     }
 
@@ -133,7 +130,7 @@ public class NettyMessageClientDecoderDelegateTest extends 
TestLogger {
      * channel. The data buffer part should be discarded before reading the 
next message.
      */
     @Test
-    public void testClientMessageDecodeWithReleasedInputChannel() throws 
Exception {
+    void testClientMessageDecodeWithReleasedInputChannel() throws Exception {
         testNettyMessageClientDecoding(false, true, false);
     }
 
@@ -142,7 +139,7 @@ public class NettyMessageClientDecoderDelegateTest extends 
TestLogger {
      * channel. The data buffer part should be discarded before reading the 
next message.
      */
     @Test
-    public void testClientMessageDecodeWithRemovedInputChannel() throws 
Exception {
+    void testClientMessageDecodeWithRemovedInputChannel() throws Exception {
         testNettyMessageClientDecoding(false, false, true);
     }
 
@@ -318,7 +315,7 @@ public class NettyMessageClientDecoderDelegateTest extends 
TestLogger {
         List<NettyMessage> decodedMessages = new ArrayList<>();
         Object input;
         while ((input = channel.readInbound()) != null) {
-            assertTrue(input instanceof NettyMessage);
+            assertThat(input).isInstanceOf(NettyMessage.class);
             decodedMessages.add((NettyMessage) input);
         }
 
@@ -327,17 +324,17 @@ public class NettyMessageClientDecoderDelegateTest 
extends TestLogger {
 
     private void verifyDecodedMessages(
             List<BufferResponse> expectedMessages, List<NettyMessage> 
decodedMessages) {
-        assertEquals(expectedMessages.size(), decodedMessages.size());
+        assertThat(decodedMessages).hasSameSizeAs(expectedMessages);
         for (int i = 0; i < expectedMessages.size(); ++i) {
-            assertEquals(expectedMessages.get(i).getClass(), 
decodedMessages.get(i).getClass());
+            
assertThat(decodedMessages.get(i)).isInstanceOf(expectedMessages.get(i).getClass());
 
             BufferResponse expected = expectedMessages.get(i);
             BufferResponse actual = (BufferResponse) decodedMessages.get(i);
             verifyBufferResponseHeader(expected, actual);
             if (expected.bufferSize == 0 || 
!expected.receiverId.equals(inputChannelId)) {
-                assertNull(actual.getBuffer());
+                assertThat(actual.getBuffer()).isNull();
             } else {
-                assertEquals(expected.getBuffer(), actual.getBuffer());
+                assertThat(actual.getBuffer()).isEqualTo(expected.getBuffer());
             }
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
index b6927e5c1ec..153836415b3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
@@ -30,14 +30,12 @@ import 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.util.TestLoggerExtension;
 
 import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -60,7 +58,6 @@ import static org.assertj.core.api.Assertions.assertThat;
  * Tests for the serialization and deserialization of the various {@link 
NettyMessage} sub-classes
  * sent from server side to client side.
  */
-@ExtendWith(TestLoggerExtension.class)
 class NettyMessageClientSideSerializationTest {
 
     private static final int BUFFER_SIZE = 1024;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java
index 3aedfa3bd51..57f20e4c61f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java
@@ -21,46 +21,45 @@ package org.apache.flink.runtime.io.network.netty;
 import org.apache.flink.runtime.event.task.IntegerTaskEvent;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
-import org.apache.flink.util.TestLogger;
 
 import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Random;
 
 import static 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.encodeAndDecode;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Tests for the serialization and deserialization of the various {@link 
NettyMessage} sub-classes
  * sent from client side to server side.
  */
-public class NettyMessageServerSideSerializationTest extends TestLogger {
+class NettyMessageServerSideSerializationTest {
 
     private final Random random = new Random();
 
     private EmbeddedChannel channel;
 
-    @Before
-    public void setup() {
+    @BeforeEach
+    void setup() {
         channel =
                 new EmbeddedChannel(
                         new NettyMessage.NettyMessageEncoder(), // For 
outbound messages
                         new NettyMessage.NettyMessageDecoder()); // For 
inbound messages
     }
 
-    @After
-    public void tearDown() {
+    @AfterEach
+    void tearDown() {
         if (channel != null) {
             channel.close();
         }
     }
 
     @Test
-    public void testPartitionRequest() {
+    void testPartitionRequest() {
         NettyMessage.PartitionRequest expected =
                 new NettyMessage.PartitionRequest(
                         new ResultPartitionID(),
@@ -70,14 +69,14 @@ public class NettyMessageServerSideSerializationTest 
extends TestLogger {
 
         NettyMessage.PartitionRequest actual = encodeAndDecode(expected, 
channel);
 
-        assertEquals(expected.partitionId, actual.partitionId);
-        assertEquals(expected.queueIndex, actual.queueIndex);
-        assertEquals(expected.receiverId, actual.receiverId);
-        assertEquals(expected.credit, actual.credit);
+        assertThat(actual.partitionId).isEqualTo(expected.partitionId);
+        assertThat(actual.queueIndex).isEqualTo(expected.queueIndex);
+        assertThat(actual.receiverId).isEqualTo(expected.receiverId);
+        assertThat(actual.credit).isEqualTo(expected.credit);
     }
 
     @Test
-    public void testTaskEventRequest() {
+    void testTaskEventRequest() {
         NettyMessage.TaskEventRequest expected =
                 new NettyMessage.TaskEventRequest(
                         new IntegerTaskEvent(random.nextInt()),
@@ -85,65 +84,65 @@ public class NettyMessageServerSideSerializationTest 
extends TestLogger {
                         new InputChannelID());
         NettyMessage.TaskEventRequest actual = encodeAndDecode(expected, 
channel);
 
-        assertEquals(expected.event, actual.event);
-        assertEquals(expected.partitionId, actual.partitionId);
-        assertEquals(expected.receiverId, actual.receiverId);
+        assertThat(actual.event).isEqualTo(expected.event);
+        assertThat(actual.partitionId).isEqualTo(expected.partitionId);
+        assertThat(actual.receiverId).isEqualTo(expected.receiverId);
     }
 
     @Test
-    public void testCancelPartitionRequest() {
+    void testCancelPartitionRequest() {
         NettyMessage.CancelPartitionRequest expected =
                 new NettyMessage.CancelPartitionRequest(new InputChannelID());
         NettyMessage.CancelPartitionRequest actual = encodeAndDecode(expected, 
channel);
 
-        assertEquals(expected.receiverId, actual.receiverId);
+        assertThat(actual.receiverId).isEqualTo(expected.receiverId);
     }
 
     @Test
-    public void testCloseRequest() {
+    void testCloseRequest() {
         NettyMessage.CloseRequest expected = new NettyMessage.CloseRequest();
         NettyMessage.CloseRequest actual = encodeAndDecode(expected, channel);
 
-        assertEquals(expected.getClass(), actual.getClass());
+        assertThat(actual).isExactlyInstanceOf(expected.getClass());
     }
 
     @Test
-    public void testAddCredit() {
+    void testAddCredit() {
         NettyMessage.AddCredit expected =
                 new NettyMessage.AddCredit(
                         random.nextInt(Integer.MAX_VALUE) + 1, new 
InputChannelID());
         NettyMessage.AddCredit actual = encodeAndDecode(expected, channel);
 
-        assertEquals(expected.credit, actual.credit);
-        assertEquals(expected.receiverId, actual.receiverId);
+        assertThat(actual.credit).isEqualTo(expected.credit);
+        assertThat(actual.receiverId).isEqualTo(expected.receiverId);
     }
 
     @Test
-    public void testResumeConsumption() {
+    void testResumeConsumption() {
         NettyMessage.ResumeConsumption expected =
                 new NettyMessage.ResumeConsumption(new InputChannelID());
         NettyMessage.ResumeConsumption actual = encodeAndDecode(expected, 
channel);
 
-        assertEquals(expected.receiverId, actual.receiverId);
+        assertThat(actual.receiverId).isEqualTo(expected.receiverId);
     }
 
     @Test
-    public void testAckAllUserRecordsProcessed() {
+    void testAckAllUserRecordsProcessed() {
         NettyMessage.AckAllUserRecordsProcessed expected =
                 new NettyMessage.AckAllUserRecordsProcessed(new 
InputChannelID());
         NettyMessage.AckAllUserRecordsProcessed actual = 
encodeAndDecode(expected, channel);
 
-        assertEquals(expected.receiverId, actual.receiverId);
+        assertThat(actual.receiverId).isEqualTo(expected.receiverId);
     }
 
     @Test
-    public void testNewBufferSize() {
+    void testNewBufferSize() {
         NettyMessage.NewBufferSize expected =
                 new NettyMessage.NewBufferSize(
                         random.nextInt(Integer.MAX_VALUE), new 
InputChannelID());
         NettyMessage.NewBufferSize actual = encodeAndDecode(expected, channel);
 
-        assertEquals(expected.bufferSize, actual.bufferSize);
-        assertEquals(expected.receiverId, actual.receiverId);
+        assertThat(actual.bufferSize).isEqualTo(expected.bufferSize);
+        assertThat(actual.receiverId).isEqualTo(expected.receiverId);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
index ccfef5f2442..191deba182a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
@@ -30,13 +30,15 @@ import 
org.apache.flink.runtime.io.network.netty.NettyMessage.ResumeConsumption;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -44,25 +46,20 @@ import java.net.InetSocketAddress;
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel;
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.mockConnectionManagerWithPartitionRequestClient;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link NettyPartitionRequestClient}. */
-@RunWith(Parameterized.class)
-public class NettyPartitionRequestClientTest {
-    @Parameterized.Parameter public boolean connectionReuseEnabled;
+@ExtendWith(ParameterizedTestExtension.class)
+class NettyPartitionRequestClientTest {
+    @Parameter private boolean connectionReuseEnabled;
 
-    @Parameterized.Parameters(name = "connection reuse enabled = {0}")
-    public static Object[] parameters() {
+    @Parameters(name = "connection reuse enabled = {0}")
+    private static Object[] parameters() {
         return new Object[][] {new Object[] {true}, new Object[] {false}};
     }
 
-    @Test
-    public void testPartitionRequestClientReuse() throws Exception {
+    @TestTemplate
+    void testPartitionRequestClientReuse() throws Exception {
         final CreditBasedPartitionRequestClientHandler handler =
                 new CreditBasedPartitionRequestClientHandler();
         final EmbeddedChannel channel = new EmbeddedChannel(handler);
@@ -76,11 +73,11 @@ public class NettyPartitionRequestClientTest {
         try {
             // Client should not be disposed in idle
             client.close(inputChannel);
-            assertFalse(client.canBeDisposed());
+            assertThat(client.canBeDisposed()).isFalse();
 
             // Client should be disposed in error
             handler.notifyAllChannelsOfErrorAndClose(new RuntimeException());
-            assertTrue(client.canBeDisposed());
+            assertThat(client.canBeDisposed()).isTrue();
         } finally {
             // Release all the buffer resources
             inputGate.close();
@@ -90,8 +87,8 @@ public class NettyPartitionRequestClientTest {
         }
     }
 
-    @Test
-    public void testRetriggerPartitionRequest() throws Exception {
+    @TestTemplate
+    void testRetriggerPartitionRequest() throws Exception {
         final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs
 
         final CreditBasedPartitionRequestClientHandler handler =
@@ -120,13 +117,12 @@ public class NettyPartitionRequestClientTest {
             // first subpartition request
             inputChannel.requestSubpartition();
 
-            assertTrue(channel.isWritable());
+            assertThat(channel.isWritable()).isTrue();
             Object readFromOutbound = channel.readOutbound();
-            assertThat(readFromOutbound, instanceOf(PartitionRequest.class));
-            assertEquals(
-                    inputChannel.getInputChannelId(),
-                    ((PartitionRequest) readFromOutbound).receiverId);
-            assertEquals(numExclusiveBuffers, ((PartitionRequest) 
readFromOutbound).credit);
+            assertThat(readFromOutbound).isInstanceOf(PartitionRequest.class);
+            assertThat(((PartitionRequest) readFromOutbound).receiverId)
+                    .isEqualTo(inputChannel.getInputChannelId());
+            assertThat(((PartitionRequest) 
readFromOutbound).credit).isEqualTo(numExclusiveBuffers);
 
             // retrigger subpartition request, e.g. due to failures
             inputGate.retriggerPartitionRequest(
@@ -135,11 +131,10 @@ public class NettyPartitionRequestClientTest {
             runAllScheduledPendingTasks(channel, deadline);
 
             readFromOutbound = channel.readOutbound();
-            assertThat(readFromOutbound, instanceOf(PartitionRequest.class));
-            assertEquals(
-                    inputChannel.getInputChannelId(),
-                    ((PartitionRequest) readFromOutbound).receiverId);
-            assertEquals(numExclusiveBuffers, ((PartitionRequest) 
readFromOutbound).credit);
+            assertThat(readFromOutbound).isInstanceOf(PartitionRequest.class);
+            assertThat(((PartitionRequest) readFromOutbound).receiverId)
+                    .isEqualTo(inputChannel.getInputChannelId());
+            assertThat(((PartitionRequest) 
readFromOutbound).credit).isEqualTo(numExclusiveBuffers);
 
             // retrigger subpartition request once again, e.g. due to failures
             inputGate.retriggerPartitionRequest(
@@ -148,13 +143,12 @@ public class NettyPartitionRequestClientTest {
             runAllScheduledPendingTasks(channel, deadline);
 
             readFromOutbound = channel.readOutbound();
-            assertThat(readFromOutbound, instanceOf(PartitionRequest.class));
-            assertEquals(
-                    inputChannel.getInputChannelId(),
-                    ((PartitionRequest) readFromOutbound).receiverId);
-            assertEquals(numExclusiveBuffers, ((PartitionRequest) 
readFromOutbound).credit);
+            assertThat(readFromOutbound).isInstanceOf(PartitionRequest.class);
+            assertThat(((PartitionRequest) readFromOutbound).receiverId)
+                    .isEqualTo(inputChannel.getInputChannelId());
+            assertThat(((PartitionRequest) 
readFromOutbound).credit).isEqualTo(numExclusiveBuffers);
 
-            assertNull(channel.readOutbound());
+            assertThat((Object) channel.readOutbound()).isNull();
         } finally {
             // Release all the buffer resources
             inputGate.close();
@@ -164,8 +158,8 @@ public class NettyPartitionRequestClientTest {
         }
     }
 
-    @Test
-    public void testDoublePartitionRequest() throws Exception {
+    @TestTemplate
+    void testDoublePartitionRequest() throws Exception {
         final CreditBasedPartitionRequestClientHandler handler =
                 new CreditBasedPartitionRequestClientHandler();
         final EmbeddedChannel channel = new EmbeddedChannel(handler);
@@ -185,15 +179,14 @@ public class NettyPartitionRequestClientTest {
             inputChannel.requestSubpartition();
 
             // The input channel should only send one partition request
-            assertTrue(channel.isWritable());
+            assertThat(channel.isWritable()).isTrue();
             Object readFromOutbound = channel.readOutbound();
-            assertThat(readFromOutbound, instanceOf(PartitionRequest.class));
-            assertEquals(
-                    inputChannel.getInputChannelId(),
-                    ((PartitionRequest) readFromOutbound).receiverId);
-            assertEquals(numExclusiveBuffers, ((PartitionRequest) 
readFromOutbound).credit);
+            assertThat(readFromOutbound).isInstanceOf(PartitionRequest.class);
+            assertThat(((PartitionRequest) readFromOutbound).receiverId)
+                    .isEqualTo(inputChannel.getInputChannelId());
+            assertThat(((PartitionRequest) 
readFromOutbound).credit).isEqualTo(numExclusiveBuffers);
 
-            assertNull(channel.readOutbound());
+            assertThat((Object) channel.readOutbound()).isNull();
         } finally {
             // Release all the buffer resources
             inputGate.close();
@@ -203,8 +196,8 @@ public class NettyPartitionRequestClientTest {
         }
     }
 
-    @Test
-    public void testResumeConsumption() throws Exception {
+    @TestTemplate
+    void testResumeConsumption() throws Exception {
         final CreditBasedPartitionRequestClientHandler handler =
                 new CreditBasedPartitionRequestClientHandler();
         final EmbeddedChannel channel = new EmbeddedChannel(handler);
@@ -224,15 +217,14 @@ public class NettyPartitionRequestClientTest {
             inputChannel.resumeConsumption();
             channel.runPendingTasks();
             Object readFromOutbound = channel.readOutbound();
-            assertThat(readFromOutbound, instanceOf(PartitionRequest.class));
+            assertThat(readFromOutbound).isInstanceOf(PartitionRequest.class);
 
             readFromOutbound = channel.readOutbound();
-            assertThat(readFromOutbound, instanceOf(ResumeConsumption.class));
-            assertEquals(
-                    inputChannel.getInputChannelId(),
-                    ((ResumeConsumption) readFromOutbound).receiverId);
+            assertThat(readFromOutbound).isInstanceOf(ResumeConsumption.class);
+            assertThat(((ResumeConsumption) readFromOutbound).receiverId)
+                    .isEqualTo(inputChannel.getInputChannelId());
 
-            assertNull(channel.readOutbound());
+            assertThat((Object) channel.readOutbound()).isNull();
         } finally {
             // Release all the buffer resources
             inputGate.close();
@@ -242,8 +234,8 @@ public class NettyPartitionRequestClientTest {
         }
     }
 
-    @Test
-    public void testAcknowledgeAllRecordsProcessed() throws Exception {
+    @TestTemplate
+    void testAcknowledgeAllRecordsProcessed() throws Exception {
         CreditBasedPartitionRequestClientHandler handler =
                 new CreditBasedPartitionRequestClientHandler();
         EmbeddedChannel channel = new EmbeddedChannel(handler);
@@ -263,15 +255,15 @@ public class NettyPartitionRequestClientTest {
             inputChannel.acknowledgeAllRecordsProcessed();
             channel.runPendingTasks();
             Object readFromOutbound = channel.readOutbound();
-            assertThat(readFromOutbound, instanceOf(PartitionRequest.class));
+            assertThat(readFromOutbound).isInstanceOf(PartitionRequest.class);
 
             readFromOutbound = channel.readOutbound();
-            assertThat(readFromOutbound, 
instanceOf(NettyMessage.AckAllUserRecordsProcessed.class));
-            assertEquals(
-                    inputChannel.getInputChannelId(),
-                    ((NettyMessage.AckAllUserRecordsProcessed) 
readFromOutbound).receiverId);
+            assertThat(readFromOutbound)
+                    
.isInstanceOf(NettyMessage.AckAllUserRecordsProcessed.class);
+            assertThat(((NettyMessage.AckAllUserRecordsProcessed) 
readFromOutbound).receiverId)
+                    .isEqualTo(inputChannel.getInputChannelId());
 
-            assertNull(channel.readOutbound());
+            assertThat((Object) channel.readOutbound()).isNull();
         } finally {
             // Release all the buffer resources
             inputGate.close();
@@ -281,7 +273,7 @@ public class NettyPartitionRequestClientTest {
         }
     }
 
-    private NettyPartitionRequestClient createPartitionRequestClient(
+    private static NettyPartitionRequestClient createPartitionRequestClient(
             Channel tcpChannel, NetworkClientHandler clientHandler, boolean 
connectionReuseEnabled)
             throws Exception {
         ConnectionID connectionID =
@@ -304,7 +296,7 @@ public class NettyPartitionRequestClientTest {
      * @param deadline maximum timestamp in ms to stop waiting further
      * @throws InterruptedException
      */
-    void runAllScheduledPendingTasks(EmbeddedChannel channel, long deadline)
+    private static void runAllScheduledPendingTasks(EmbeddedChannel channel, 
long deadline)
             throws InterruptedException {
         // NOTE: we don't have to be super fancy here; busy-polling with 1ms 
delays is enough
         while (channel.runScheduledPendingTasks() != -1 && 
System.currentTimeMillis() < deadline) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerFromPortRangeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerFromPortRangeTest.java
index ca27b8bd6e0..6b78aee13a1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerFromPortRangeTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerFromPortRangeTest.java
@@ -54,7 +54,7 @@ class NettyServerFromPortRangeTest {
         assertThat(listeningPort2).isEqualTo(port2.getPort());
     }
 
-    private NettyConfig getConfig(NetUtils.Port... ports) {
+    private static NettyConfig getConfig(NetUtils.Port... ports) {
         String portRangeStr =
                 Arrays.stream(ports)
                         .map(NetUtils.Port::getPort)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
index 42dc38c8e30..8605bcd1498 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
@@ -34,13 +34,12 @@ import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
-import static junit.framework.TestCase.assertEquals;
 import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
 import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
 import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test utility for Netty server and client setup. */
 public class NettyTestUtil {
@@ -198,7 +197,7 @@ public class NettyTestUtil {
         while ((encoded = channel.readOutbound()) != null) {
             msgNotEmpty = channel.writeInbound(encoded);
         }
-        assertTrue(msgNotEmpty);
+        assertThat(msgNotEmpty).isTrue();
 
         return channel.readInbound();
     }
@@ -208,20 +207,20 @@ public class NettyTestUtil {
     // 
---------------------------------------------------------------------------------------------
 
     static void verifyErrorResponse(ErrorResponse expected, ErrorResponse 
actual) {
-        assertEquals(expected.receiverId, actual.receiverId);
-        assertEquals(expected.cause.getClass(), actual.cause.getClass());
-        assertEquals(expected.cause.getMessage(), actual.cause.getMessage());
+        assertThat(actual.receiverId).isEqualTo(expected.receiverId);
+        assertThat(expected.cause).hasSameClassAs(actual.cause);
+        
assertThat(expected.cause.getMessage()).isEqualTo(actual.cause.getMessage());
 
         if (expected.receiverId == null) {
-            assertTrue(actual.isFatalError());
+            assertThat(actual.isFatalError()).isTrue();
         }
     }
 
     static void verifyBufferResponseHeader(BufferResponse expected, 
BufferResponse actual) {
-        assertEquals(expected.backlog, actual.backlog);
-        assertEquals(expected.sequenceNumber, actual.sequenceNumber);
-        assertEquals(expected.bufferSize, actual.bufferSize);
-        assertEquals(expected.receiverId, actual.receiverId);
+        assertThat(expected.backlog).isEqualTo(actual.backlog);
+        assertThat(expected.sequenceNumber).isEqualTo(actual.sequenceNumber);
+        assertThat(expected.bufferSize).isEqualTo(actual.bufferSize);
+        assertThat(expected.receiverId).isEqualTo(actual.receiverId);
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index d923d76a6c9..15c5e435e61 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -22,10 +22,10 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
 import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
-import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
@@ -36,6 +36,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdap
 
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -47,26 +48,30 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 import static 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
 
 /** {@link PartitionRequestClientFactory} test. */
 @ExtendWith(ParameterizedTestExtension.class)
-public class PartitionRequestClientFactoryTest extends TestLogger {
+class PartitionRequestClientFactoryTest {
     private static final ResourceID RESOURCE_ID = ResourceID.generate();
 
-    @Parameter public boolean connectionReuseEnabled;
+    @RegisterExtension
+    private static final TestExecutorExtension<ExecutorService> 
EXECUTOR_EXTENSION =
+            new TestExecutorExtension<>(() -> 
Executors.newFixedThreadPool(10));
+
+    @Parameter private boolean connectionReuseEnabled;
 
     @Parameters(name = "connectionReuseEnabled={0}")
-    public static Collection<Boolean> parameters() {
+    private static Collection<Boolean> parameters() {
         return Arrays.asList(false, true);
     }
 
@@ -163,7 +168,7 @@ public class PartitionRequestClientFactoryTest extends 
TestLogger {
                             RESOURCE_ID, (int) (Math.random() * 
Integer.MAX_VALUE));
             set.add(factory.createPartitionRequestClient(connectionID));
         }
-        assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
+        assertThat(set).hasSizeLessThanOrEqualTo(maxNumberOfConnections);
     }
 
     /**
@@ -288,45 +293,31 @@ public class PartitionRequestClientFactoryTest extends 
TestLogger {
                 new PartitionRequestClientFactory(
                         unstableNettyClient, 2, 1, connectionReuseEnabled);
 
-        ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(10);
-        List<Future<NettyPartitionRequestClient>> futures = new ArrayList<>();
+        List<CompletableFuture<NettyPartitionRequestClient>> futures = new 
ArrayList<>();
 
         for (int i = 0; i < 10; i++) {
-            Future<NettyPartitionRequestClient> future =
-                    threadPoolExecutor.submit(
+            futures.add(
+                    CompletableFuture.supplyAsync(
                             () -> {
-                                NettyPartitionRequestClient client = null;
                                 try {
-                                    client =
-                                            
factory.createPartitionRequestClient(
-                                                    
serverAndClient.getConnectionID(
-                                                            RESOURCE_ID, 0));
+                                    return 
factory.createPartitionRequestClient(
+                                            
serverAndClient.getConnectionID(RESOURCE_ID, 0));
                                 } catch (Exception e) {
-                                    fail(e.getMessage());
+                                    throw new CompletionException(e);
                                 }
-                                return client;
-                            });
-
-            futures.add(future);
+                            },
+                            EXECUTOR_EXTENSION.getExecutor()));
         }
 
         futures.forEach(
-                runnableFuture -> {
-                    NettyPartitionRequestClient client;
-                    try {
-                        client = runnableFuture.get();
-                        assertThat(client).isNotNull();
-                    } catch (Exception e) {
-                        System.out.println(e.getMessage());
-                        fail();
-                    }
-                });
+                runnableFuture ->
+                        
assertThatFuture(runnableFuture).eventuallySucceeds().isNotNull());
 
-        threadPoolExecutor.shutdown();
         shutdown(serverAndClient);
     }
 
-    private NettyTestUtil.NettyServerAndClient createNettyServerAndClient() 
throws Exception {
+    private static NettyTestUtil.NettyServerAndClient 
createNettyServerAndClient()
+            throws Exception {
         return NettyTestUtil.initServerAndClient(
                 new NettyProtocol(null, null) {
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index 1a16710a811..e113f817d4b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -46,48 +46,38 @@ import 
org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import javax.annotation.Nullable;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
 import static 
org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link PartitionRequestQueue}. */
-public class PartitionRequestQueueTest {
-
-    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+class PartitionRequestQueueTest {
 
     private static final int BUFFER_SIZE = 1024 * 1024;
 
     private static FileChannelManager fileChannelManager;
 
-    @BeforeClass
-    public static void setUp() throws Exception {
+    @BeforeAll
+    static void setUp(@TempDir File temporaryFolder) {
         fileChannelManager =
                 new FileChannelManagerImpl(
-                        new String[] 
{TEMPORARY_FOLDER.newFolder().getAbsolutePath()}, "testing");
+                        new String[] {temporaryFolder.getAbsolutePath()}, 
"testing");
     }
 
-    @AfterClass
-    public static void shutdown() throws Exception {
+    @AfterAll
+    static void shutdown() throws Exception {
         fileChannelManager.close();
     }
 
@@ -103,13 +93,12 @@ public class PartitionRequestQueueTest {
         reader.requestSubpartitionViewOrRegisterListener(
                 resultPartitionManager, resultPartitionId, 0);
 
-        assertEquals(
-                resultPartitionManager
-                        .getListenerManagers()
-                        .get(resultPartitionId)
-                        .getPartitionRequestListeners()
-                        .size(),
-                1);
+        assertThat(
+                        resultPartitionManager
+                                .getListenerManagers()
+                                .get(resultPartitionId)
+                                .getPartitionRequestListeners())
+                .hasSize(1);
 
         reader.notifyPartitionRequestTimeout(
                 resultPartitionManager
@@ -122,11 +111,12 @@ public class PartitionRequestQueueTest {
         channel.runPendingTasks();
 
         Object read = channel.readOutbound();
-        assertNotNull(read);
-        assertThat(read, instanceOf(NettyMessage.ErrorResponse.class));
-        assertThat(
-                ((NettyMessage.ErrorResponse) read).cause,
-                instanceOf(PartitionNotFoundException.class));
+        assertThat(read)
+                .isNotNull()
+                .isInstanceOf(NettyMessage.ErrorResponse.class)
+                .isInstanceOfSatisfying(
+                        NettyMessage.ErrorResponse.class,
+                        r -> 
assertThat(r.cause).isInstanceOf(PartitionNotFoundException.class));
     }
 
     /**
@@ -135,7 +125,7 @@ public class PartitionRequestQueueTest {
      * messages.
      */
     @Test
-    public void testNotifyReaderNonEmptyOnEmptyReaders() throws Exception {
+    void testNotifyReaderNonEmptyOnEmptyReaders() throws Exception {
         final int buffersToWrite = 5;
         PartitionRequestQueue queue = new PartitionRequestQueue();
         EmbeddedChannel channel = new EmbeddedChannel(queue);
@@ -153,11 +143,11 @@ public class PartitionRequestQueueTest {
                         .build(),
                 0);
         reader1.notifyDataAvailable();
-        assertTrue(reader1.getAvailabilityAndBacklog().isAvailable());
-        assertFalse(reader1.isRegisteredAsAvailable());
+        assertThat(reader1.getAvailabilityAndBacklog().isAvailable()).isTrue();
+        assertThat(reader1.isRegisteredAsAvailable()).isFalse();
 
         channel.unsafe().outboundBuffer().setUserDefinedWritability(1, false);
-        assertFalse(channel.isWritable());
+        assertThat(channel.isWritable()).isFalse();
 
         reader1.notifyDataAvailable();
         channel.runPendingTasks();
@@ -170,26 +160,26 @@ public class PartitionRequestQueueTest {
                                         new 
DefaultBufferResultSubpartitionView(buffersToWrite))
                         .build(),
                 0);
-        assertTrue(reader2.getAvailabilityAndBacklog().isAvailable());
-        assertFalse(reader2.isRegisteredAsAvailable());
+        assertThat(reader2.getAvailabilityAndBacklog().isAvailable()).isTrue();
+        assertThat(reader2.isRegisteredAsAvailable()).isFalse();
 
         reader2.notifyDataAvailable();
 
         // changing a channel writability should result in draining both 
reader1 and reader2
         channel.unsafe().outboundBuffer().setUserDefinedWritability(1, true);
         channel.runPendingTasks();
-        assertEquals(buffersToWrite, channel.outboundMessages().size());
+        assertThat(channel.outboundMessages()).hasSize(buffersToWrite);
     }
 
     /** Tests {@link PartitionRequestQueue} buffer writing with default 
buffers. */
     @Test
-    public void testDefaultBufferWriting() throws Exception {
+    void testDefaultBufferWriting() throws Exception {
         testBufferWriting(new DefaultBufferResultSubpartitionView(1));
     }
 
     /** Tests {@link PartitionRequestQueue} buffer writing with read-only 
buffers. */
     @Test
-    public void testReadOnlyBufferWriting() throws Exception {
+    void testReadOnlyBufferWriting() throws Exception {
         testBufferWriting(new ReadOnlyBufferResultSubpartitionView(1));
     }
 
@@ -214,13 +204,13 @@ public class PartitionRequestQueueTest {
         channel.runPendingTasks();
 
         Object read = channel.readOutbound();
-        assertNotNull(read);
+        assertThat(read).isNotNull();
         if (read instanceof NettyMessage.ErrorResponse) {
             ((NettyMessage.ErrorResponse) read).cause.printStackTrace();
         }
-        assertThat(read, instanceOf(NettyMessage.BufferResponse.class));
+        assertThat(read).isInstanceOf(NettyMessage.BufferResponse.class);
         read = channel.readOutbound();
-        assertNull(read);
+        assertThat(read).isNull();
     }
 
     private static class DefaultBufferResultSubpartitionView extends 
NoOpResultSubpartitionView {
@@ -287,7 +277,7 @@ public class PartitionRequestQueueTest {
      * even though it has no available credits.
      */
     @Test
-    public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+    void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
         // setup
         final ResultSubpartitionView view = new 
NextIsEventResultSubpartitionView();
 
@@ -306,7 +296,7 @@ public class PartitionRequestQueueTest {
 
         // block the channel so that we see an intermediate state in the test
         ByteBuf channelBlockingBuffer = blockChannel(channel);
-        assertNull(channel.readOutbound());
+        assertThat((Object) channel.readOutbound()).isNull();
 
         // Notify an available event buffer to trigger enqueue the reader
         reader.notifyDataAvailable();
@@ -315,16 +305,16 @@ public class PartitionRequestQueueTest {
 
         // The reader is enqueued in the pipeline because the next buffer is 
an event, even though
         // no credits are available
-        assertThat(queue.getAvailableReaders(), contains(reader)); // contains 
only (this) one!
-        assertEquals(0, reader.getNumCreditsAvailable());
+        assertThat(queue.getAvailableReaders()).contains(reader); // contains 
only (this) one!
+        assertThat(reader.getNumCreditsAvailable()).isZero();
 
         // Flush the buffer to make the channel writable again and see the 
final results
         channel.flush();
-        assertSame(channelBlockingBuffer, channel.readOutbound());
+        assertThat((ByteBuf) 
channel.readOutbound()).isSameAs(channelBlockingBuffer);
 
-        assertEquals(0, queue.getAvailableReaders().size());
-        assertEquals(0, reader.getNumCreditsAvailable());
-        assertNull(channel.readOutbound());
+        assertThat(queue.getAvailableReaders()).isEmpty();
+        assertThat(reader.getNumCreditsAvailable()).isZero();
+        assertThat((Object) channel.readOutbound()).isNull();
     }
 
     private static class NextIsEventResultSubpartitionView extends 
NoOpResultSubpartitionView {
@@ -340,7 +330,7 @@ public class PartitionRequestQueueTest {
      * buffers.
      */
     @Test
-    public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception 
{
+    void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception {
         // setup
         final ResultSubpartitionView view = new 
DefaultBufferResultSubpartitionView(10);
 
@@ -361,7 +351,7 @@ public class PartitionRequestQueueTest {
 
         // block the channel so that we see an intermediate state in the test
         ByteBuf channelBlockingBuffer = blockChannel(channel);
-        assertNull(channel.readOutbound());
+        assertThat((Object) channel.readOutbound()).isNull();
 
         // Notify available buffers to trigger enqueue the reader
         final int notifyNumBuffers = 5;
@@ -373,10 +363,10 @@ public class PartitionRequestQueueTest {
 
         // the reader is not enqueued in the pipeline because no credits are 
available
         // -> it should still have the same number of pending buffers
-        assertEquals(0, queue.getAvailableReaders().size());
-        assertTrue(reader.hasBuffersAvailable().isAvailable());
-        assertFalse(reader.isRegisteredAsAvailable());
-        assertEquals(0, reader.getNumCreditsAvailable());
+        assertThat(queue.getAvailableReaders()).isEmpty();
+        assertThat(reader.hasBuffersAvailable().isAvailable()).isTrue();
+        assertThat(reader.isRegisteredAsAvailable()).isFalse();
+        assertThat(reader.getNumCreditsAvailable()).isZero();
 
         // Notify available credits to trigger enqueue the reader again
         final int notifyNumCredits = 3;
@@ -388,24 +378,25 @@ public class PartitionRequestQueueTest {
             // since the channel is blocked though, we will not process 
anything and only enqueue
             // the
             // reader once
-            assertTrue(reader.isRegisteredAsAvailable());
-            assertThat(queue.getAvailableReaders(), contains(reader)); // 
contains only (this) one!
-            assertEquals(i, reader.getNumCreditsAvailable());
-            assertTrue(reader.hasBuffersAvailable().isAvailable());
+            assertThat(reader.isRegisteredAsAvailable()).isTrue();
+            assertThat(queue.getAvailableReaders()).contains(reader); // 
contains only (this) one!
+            assertThat(reader.getNumCreditsAvailable()).isEqualTo(i);
+            assertThat(reader.hasBuffersAvailable().isAvailable()).isTrue();
         }
 
         // Flush the buffer to make the channel writable again and see the 
final results
         channel.flush();
-        assertSame(channelBlockingBuffer, channel.readOutbound());
+        assertThat((ByteBuf) 
channel.readOutbound()).isSameAs(channelBlockingBuffer);
 
-        assertEquals(0, queue.getAvailableReaders().size());
-        assertEquals(0, reader.getNumCreditsAvailable());
-        assertTrue(reader.hasBuffersAvailable().isAvailable());
-        assertFalse(reader.isRegisteredAsAvailable());
+        assertThat(queue.getAvailableReaders()).isEmpty();
+        assertThat(reader.getNumCreditsAvailable()).isZero();
+        assertThat(reader.hasBuffersAvailable().isAvailable()).isTrue();
+        assertThat(reader.isRegisteredAsAvailable()).isFalse();
         for (int i = 1; i <= notifyNumCredits; i++) {
-            assertThat(channel.readOutbound(), 
instanceOf(NettyMessage.BufferResponse.class));
+            assertThat((Object) channel.readOutbound())
+                    .isInstanceOf(NettyMessage.BufferResponse.class);
         }
-        assertNull(channel.readOutbound());
+        assertThat((Object) channel.readOutbound()).isNull();
     }
 
     /**
@@ -414,7 +405,7 @@ public class PartitionRequestQueueTest {
      * there are credit and data available.
      */
     @Test
-    public void testEnqueueReaderByResumingConsumption() throws Exception {
+    void testEnqueueReaderByResumingConsumption() throws Exception {
         PipelinedSubpartition subpartition =
                 PipelinedSubpartitionTest.createPipelinedSubpartition();
         Buffer.DataType dataType1 = Buffer.DataType.ALIGNED_CHECKPOINT_BARRIER;
@@ -437,26 +428,26 @@ public class PartitionRequestQueueTest {
 
         reader.notifySubpartitionCreated(partition, 0);
         queue.notifyReaderCreated(reader);
-        assertTrue(reader.getAvailabilityAndBacklog().isAvailable());
+        assertThat(reader.getAvailabilityAndBacklog().isAvailable()).isTrue();
 
         reader.notifyDataAvailable();
         channel.runPendingTasks();
-        assertFalse(reader.getAvailabilityAndBacklog().isAvailable());
-        assertEquals(1, subpartition.unsynchronizedGetNumberOfQueuedBuffers());
+        assertThat(reader.getAvailabilityAndBacklog().isAvailable()).isFalse();
+        
assertThat(subpartition.unsynchronizedGetNumberOfQueuedBuffers()).isOne();
 
         queue.addCreditOrResumeConsumption(
                 receiverId, NetworkSequenceViewReader::resumeConsumption);
-        assertFalse(reader.getAvailabilityAndBacklog().isAvailable());
-        assertEquals(0, subpartition.unsynchronizedGetNumberOfQueuedBuffers());
+        assertThat(reader.getAvailabilityAndBacklog().isAvailable()).isFalse();
+        
assertThat(subpartition.unsynchronizedGetNumberOfQueuedBuffers()).isZero();
 
         Object data1 = channel.readOutbound();
-        assertEquals(dataType1, ((NettyMessage.BufferResponse) 
data1).buffer.getDataType());
+        assertThat(((NettyMessage.BufferResponse) 
data1).buffer.getDataType()).isEqualTo(dataType1);
         Object data2 = channel.readOutbound();
-        assertEquals(dataType2, ((NettyMessage.BufferResponse) 
data2).buffer.getDataType());
+        assertThat(((NettyMessage.BufferResponse) 
data2).buffer.getDataType()).isEqualTo(dataType2);
     }
 
     @Test
-    public void testAnnounceBacklog() throws Exception {
+    void testAnnounceBacklog() throws Exception {
         PipelinedSubpartition subpartition =
                 PipelinedSubpartitionTest.createPipelinedSubpartition();
         subpartition.add(createEventBufferConsumer(4096, 
Buffer.DataType.DATA_BUFFER));
@@ -481,24 +472,24 @@ public class PartitionRequestQueueTest {
         reader.notifyDataAvailable();
         channel.runPendingTasks();
         Object data = channel.readOutbound();
-        assertTrue(data instanceof NettyMessage.BacklogAnnouncement);
+        assertThat(data).isInstanceOf(NettyMessage.BacklogAnnouncement.class);
         NettyMessage.BacklogAnnouncement announcement = 
(NettyMessage.BacklogAnnouncement) data;
-        assertEquals(receiverId, announcement.receiverId);
-        assertEquals(subpartition.getBuffersInBacklogUnsafe(), 
announcement.backlog);
+        assertThat(announcement.receiverId).isEqualTo(receiverId);
+        
assertThat(announcement.backlog).isEqualTo(subpartition.getBuffersInBacklogUnsafe());
 
         subpartition.release();
         reader.notifyDataAvailable();
         channel.runPendingTasks();
-        assertNotNull(channel.readOutbound());
+        assertThat((Object) channel.readOutbound()).isNotNull();
     }
 
     @Test
-    public void testCancelPartitionRequestForUnavailableView() throws 
Exception {
+    void testCancelPartitionRequestForUnavailableView() throws Exception {
         testCancelPartitionRequest(false);
     }
 
     @Test
-    public void testCancelPartitionRequestForAvailableView() throws Exception {
+    void testCancelPartitionRequestForAvailableView() throws Exception {
         testCancelPartitionRequest(true);
     }
 
@@ -522,18 +513,18 @@ public class PartitionRequestQueueTest {
         // add credit to make this reader available for adding into 
availableReaders queue
         if (isAvailableView) {
             queue.addCreditOrResumeConsumption(receiverId, viewReader -> 
viewReader.addCredit(1));
-            assertTrue(queue.getAvailableReaders().contains(reader));
+            assertThat(queue.getAvailableReaders()).contains(reader);
         }
 
         // cancel this subpartition view
         queue.cancel(receiverId);
         channel.runPendingTasks();
 
-        assertFalse(queue.getAvailableReaders().contains(reader));
+        assertThat(queue.getAvailableReaders()).doesNotContain(reader);
 
         // the reader view should be released (the partition is not, though, 
blocking partitions
         // support multiple successive readers for recovery and caching)
-        assertTrue(reader.isReleased());
+        assertThat(reader.isReleased()).isTrue();
 
         // cleanup
         partition.release();
@@ -541,7 +532,7 @@ public class PartitionRequestQueueTest {
     }
 
     @Test
-    public void testNotifyNewBufferSize() throws Exception {
+    void testNotifyNewBufferSize() throws Exception {
         // given: Result partition and the reader for subpartition 0.
         ResultPartition parent = createResultPartition();
 
@@ -575,10 +566,10 @@ public class PartitionRequestQueueTest {
         // then: Buffers of received size will be in outbound channel.
         Object data1 = channel.readOutbound();
         // The size can not be less than the first record in buffer.
-        assertEquals(128, ((NettyMessage.BufferResponse) 
data1).buffer.getSize());
+        assertThat(((NettyMessage.BufferResponse) 
data1).buffer.getSize()).isEqualTo(128);
         Object data2 = channel.readOutbound();
         // The size should shrink up to notified buffer size.
-        assertEquals(65, ((NettyMessage.BufferResponse) 
data2).buffer.getSize());
+        assertThat(((NettyMessage.BufferResponse) 
data2).buffer.getSize()).isEqualTo(65);
     }
 
     private static ResultPartition createResultPartition() throws IOException {
@@ -629,7 +620,7 @@ public class PartitionRequestQueueTest {
         // to the wire although the buffer is "empty".
         ByteBuf channelBlockingBuffer = 
Unpooled.buffer(highWaterMark).writerIndex(highWaterMark);
         channel.write(channelBlockingBuffer);
-        assertFalse(channel.isWritable());
+        assertThat(channel.isWritable()).isFalse();
 
         return channelBlockingBuffer;
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandlerTest.java
index d7941ef352d..edc349b82ed 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandlerTest.java
@@ -27,24 +27,21 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
-import org.apache.flink.util.TestLogger;
 
 import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link PartitionRequestServerHandler}. */
-public class PartitionRequestServerHandlerTest extends TestLogger {
+class PartitionRequestServerHandlerTest {
 
     @Test
-    public void testResumeConsumption() {
+    void testResumeConsumption() {
         final InputChannelID inputChannelID = new InputChannelID();
         final PartitionRequestQueue partitionRequestQueue = new 
PartitionRequestQueue();
         final TestViewReader testViewReader =
@@ -61,11 +58,11 @@ public class PartitionRequestServerHandlerTest extends 
TestLogger {
         channel.writeInbound(new ResumeConsumption(inputChannelID));
         channel.runPendingTasks();
 
-        assertTrue(testViewReader.consumptionResumed);
+        assertThat(testViewReader.consumptionResumed).isTrue();
     }
 
     @Test
-    public void testAcknowledgeAllRecordsProcessed() throws IOException {
+    void testAcknowledgeAllRecordsProcessed() throws IOException {
         InputChannelID inputChannelID = new InputChannelID();
 
         ResultPartition resultPartition =
@@ -91,12 +88,11 @@ public class PartitionRequestServerHandlerTest extends 
TestLogger {
         resultPartition.notifyEndOfData(StopMode.DRAIN);
         CompletableFuture<Void> allRecordsProcessedFuture =
                 resultPartition.getAllDataProcessedFuture();
-        assertFalse(allRecordsProcessedFuture.isDone());
+        assertThat(allRecordsProcessedFuture).isNotDone();
         channel.writeInbound(new 
NettyMessage.AckAllUserRecordsProcessed(inputChannelID));
         channel.runPendingTasks();
 
-        assertTrue(allRecordsProcessedFuture.isDone());
-        assertFalse(allRecordsProcessedFuture.isCompletedExceptionally());
+        
assertThat(allRecordsProcessedFuture).isDone().isNotCompletedExceptionally();
     }
 
     @Test
@@ -117,11 +113,11 @@ public class PartitionRequestServerHandlerTest extends 
TestLogger {
         channel.writeInbound(new NettyMessage.NewBufferSize(666, 
inputChannelID));
         channel.runPendingTasks();
 
-        assertEquals(666, testViewReader.bufferSize);
+        assertThat(testViewReader.bufferSize).isEqualTo(666);
     }
 
     @Test
-    public void testReceivingNewBufferSizeBeforeReaderIsCreated() {
+    void testReceivingNewBufferSizeBeforeReaderIsCreated() {
         final InputChannelID inputChannelID = new InputChannelID();
         final PartitionRequestQueue partitionRequestQueue = new 
PartitionRequestQueue();
         final TestViewReader testViewReader =
@@ -138,10 +134,12 @@ public class PartitionRequestServerHandlerTest extends 
TestLogger {
         channel.runPendingTasks();
 
         // If error happens outbound messages would be not empty.
-        assertTrue(channel.outboundMessages().toString(), 
channel.outboundMessages().isEmpty());
+        assertThat(channel.outboundMessages())
+                .withFailMessage(channel.outboundMessages().toString())
+                .isEmpty();
 
         // New buffer size should be silently ignored because it is possible 
situation.
-        assertEquals(-1, testViewReader.bufferSize);
+        assertThat(testViewReader.bufferSize).isEqualTo(-1);
     }
 
     private static class TestViewReader extends 
CreditBasedSequenceNumberingViewReader {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
index 78bfa73e600..f5fdaf7a1b3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
@@ -33,7 +33,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 import org.mockito.stubbing.Answer;
 
 import java.util.Optional;
@@ -43,17 +43,17 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
 import static 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class ServerTransportErrorHandlingTest {
+class ServerTransportErrorHandlingTest {
 
     /** Verifies remote closes trigger the release of all resources. */
     @Test
-    public void testRemoteClose() throws Exception {
+    void testRemoteClose() throws Exception {
         final TestPooledBufferProvider outboundBuffers = new 
TestPooledBufferProvider(16);
 
         final CountDownLatch sync = new CountDownLatch(1);
@@ -103,12 +103,12 @@ public class ServerTransportErrorHandlingTest {
                             new ResultPartitionID(), 0, new InputChannelID(), 
Integer.MAX_VALUE));
 
             // Wait for the notification
-            if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), 
TimeUnit.MILLISECONDS)) {
-                fail(
-                        "Timed out after waiting for "
-                                + TestingUtils.TESTING_DURATION.toMillis()
-                                + " ms to be notified about released 
partition.");
-            }
+            assertThat(sync.await(TestingUtils.TESTING_DURATION.toMillis(), 
TimeUnit.MILLISECONDS))
+                    .withFailMessage(
+                            "Timed out after waiting for "
+                                    + TestingUtils.TESTING_DURATION.toMillis()
+                                    + " ms to be notified about released 
partition.")
+                    .isTrue();
         } finally {
             shutdown(serverAndClient);
         }

Reply via email to