This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 1c884ab4837 [FLINK-32850][flink-runtime][JUnit5 Migration] The io.network.netty package of flink-runtime module 1c884ab4837 is described below commit 1c884ab48372f7a66f86c28aeaf9518000c7f357 Author: Jiabao Sun <jiabao....@xtransfer.cn> AuthorDate: Fri Dec 15 14:57:18 2023 +0800 [FLINK-32850][flink-runtime][JUnit5 Migration] The io.network.netty package of flink-runtime module This closes #23607. --- .../runtime/io/network/netty/ByteBufUtilsTest.java | 42 +++-- .../network/netty/CancelPartitionRequestTest.java | 38 ++--- .../netty/ClientTransportErrorHandlingTest.java | 8 +- ...editBasedPartitionRequestClientHandlerTest.java | 10 +- ...CreditBasedSequenceNumberingViewReaderTest.java | 32 ++-- .../io/network/netty/NettyBufferPoolTest.java | 51 +++--- .../io/network/netty/NettyClientServerSslTest.java | 123 +++++++------- .../runtime/io/network/netty/NettyClientTest.java | 2 +- .../network/netty/NettyConnectionManagerTest.java | 38 ++--- .../NettyMessageClientDecoderDelegateTest.java | 39 +++-- .../NettyMessageClientSideSerializationTest.java | 3 - .../NettyMessageServerSideSerializationTest.java | 65 ++++---- .../netty/NettyPartitionRequestClientTest.java | 120 +++++++------- .../netty/NettyServerFromPortRangeTest.java | 2 +- .../runtime/io/network/netty/NettyTestUtil.java | 21 ++- .../netty/PartitionRequestClientFactoryTest.java | 57 +++---- .../network/netty/PartitionRequestQueueTest.java | 177 ++++++++++----------- .../netty/PartitionRequestServerHandlerTest.java | 30 ++-- .../netty/ServerTransportErrorHandlingTest.java | 20 +-- 19 files changed, 420 insertions(+), 458 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java index 4fcf083f3db..146057025ef 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java @@ -18,24 +18,20 @@ package org.apache.flink.runtime.io.network.netty; -import org.apache.flink.util.TestLogger; - import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; -import org.junit.Test; +import org.junit.jupiter.api.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; +import static org.assertj.core.api.Assertions.assertThat; /** Tests the methods in {@link ByteBufUtils}. */ -public class ByteBufUtilsTest extends TestLogger { +class ByteBufUtilsTest { private static final byte ACCUMULATION_BYTE = 0x7d; private static final byte NON_ACCUMULATION_BYTE = 0x23; @Test - public void testAccumulateWithoutCopy() { + void testAccumulateWithoutCopy() { int sourceLength = 128; int sourceReaderIndex = 32; int expectedAccumulationSize = 16; @@ -49,13 +45,13 @@ public class ByteBufUtilsTest extends TestLogger { ByteBufUtils.accumulate( target, src, expectedAccumulationSize, target.readableBytes()); - assertSame(src, accumulated); - assertEquals(sourceReaderIndex, src.readerIndex()); + assertThat(accumulated).isSameAs(src); + assertThat(src.readerIndex()).isEqualTo(sourceReaderIndex); verifyBufferContent(src, sourceReaderIndex, expectedAccumulationSize); } @Test - public void testAccumulateWithCopy() { + void testAccumulateWithCopy() { int sourceLength = 128; int firstSourceReaderIndex = 32; int secondSourceReaderIndex = 0; @@ -76,9 +72,9 @@ public class ByteBufUtilsTest extends TestLogger { ByteBuf accumulated = ByteBufUtils.accumulate( target, firstSource, expectedAccumulationSize, target.readableBytes()); - assertNull(accumulated); - assertEquals(sourceLength, firstSource.readerIndex()); - assertEquals(firstAccumulationSize, target.readableBytes()); + assertThat(accumulated).isNull(); + assertThat(firstSource.readerIndex()).isEqualTo(sourceLength); + assertThat(target.readableBytes()).isEqualTo(firstAccumulationSize); // The remaining data will be copied from the second buffer, and the target buffer will be // returned @@ -86,9 +82,10 @@ public class ByteBufUtilsTest extends TestLogger { accumulated = ByteBufUtils.accumulate( target, secondSource, expectedAccumulationSize, target.readableBytes()); - assertSame(target, accumulated); - assertEquals(secondSourceReaderIndex + secondAccumulationSize, secondSource.readerIndex()); - assertEquals(expectedAccumulationSize, target.readableBytes()); + assertThat(accumulated).isSameAs(target); + assertThat(secondSource.readerIndex()) + .isEqualTo(secondSourceReaderIndex + secondAccumulationSize); + assertThat(target.readableBytes()).isEqualTo(expectedAccumulationSize); verifyBufferContent(accumulated, 0, expectedAccumulationSize); } @@ -103,7 +100,7 @@ public class ByteBufUtilsTest extends TestLogger { * @param accumulationSize The size of bytes that will be read for accumulating. * @return The required source buffer. */ - private ByteBuf createSourceBuffer(int size, int readerIndex, int accumulationSize) { + private static ByteBuf createSourceBuffer(int size, int readerIndex, int accumulationSize) { ByteBuf buf = Unpooled.buffer(size); for (int i = 0; i < readerIndex; i++) { @@ -122,13 +119,12 @@ public class ByteBufUtilsTest extends TestLogger { return buf; } - private void verifyBufferContent(ByteBuf buf, int start, int length) { + private static void verifyBufferContent(ByteBuf buf, int start, int length) { for (int i = 0; i < length; ++i) { byte b = buf.getByte(start + i); - assertEquals( - String.format("The byte at position %d is not right.", start + i), - ACCUMULATION_BYTE, - b); + assertThat(b) + .withFailMessage("The byte at position %d is not right.", start + i) + .isEqualTo(ACCUMULATION_BYTE); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java index 7bfa0d28f85..a07e6d2aedc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java @@ -34,7 +34,7 @@ import org.apache.flink.testutils.TestingUtils; import org.apache.flink.shaded.netty4.io.netty.channel.Channel; -import org.junit.Test; +import org.junit.jupiter.api.Test; import org.mockito.stubbing.Answer; import javax.annotation.Nullable; @@ -50,16 +50,16 @@ import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class CancelPartitionRequestTest { +class CancelPartitionRequestTest { /** * Verifies that requests for non-existing (failed/cancelled) input channels are properly @@ -67,7 +67,7 @@ public class CancelPartitionRequestTest { * This should cancel the request. */ @Test - public void testCancelPartitionRequest() throws Exception { + void testCancelPartitionRequest() throws Exception { NettyServerAndClient serverAndClient = null; @@ -104,12 +104,12 @@ public class CancelPartitionRequestTest { .await(); // Wait for the notification - if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) { - fail( - "Timed out after waiting for " - + TestingUtils.TESTING_DURATION.toMillis() - + " ms to be notified about cancelled partition."); - } + assertThat(sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) + .withFailMessage( + "Timed out after waiting for " + + TestingUtils.TESTING_DURATION.toMillis() + + " ms to be notified about cancelled partition.") + .isTrue(); verify(view, times(1)).releaseAllResources(); } finally { @@ -118,7 +118,7 @@ public class CancelPartitionRequestTest { } @Test - public void testDuplicateCancel() throws Exception { + void testDuplicateCancel() throws Exception { NettyServerAndClient serverAndClient = null; @@ -163,12 +163,12 @@ public class CancelPartitionRequestTest { .await(); // Wait for the notification - if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) { - fail( - "Timed out after waiting for " - + TestingUtils.TESTING_DURATION.toMillis() - + " ms to be notified about cancelled partition."); - } + assertThat(sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) + .withFailMessage( + "Timed out after waiting for " + + TestingUtils.TESTING_DURATION.toMillis() + + " ms to be notified about cancelled partition.") + .isTrue(); ch.writeAndFlush(new CancelPartitionRequest(inputChannelId)).await(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java index 47d312f5ffa..978065c48a0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java @@ -370,7 +370,7 @@ class ClientTransportErrorHandlingTest { // Helpers // --------------------------------------------------------------------------------------------- - private EmbeddedChannel createEmbeddedChannel() { + private static EmbeddedChannel createEmbeddedChannel() { NettyProtocol protocol = new NettyProtocol( mock(ResultPartitionProvider.class), mock(TaskEventDispatcher.class)); @@ -378,7 +378,7 @@ class ClientTransportErrorHandlingTest { return new EmbeddedChannel(protocol.getClientChannelHandlers()); } - private RemoteInputChannel addInputChannel(NetworkClientHandler clientHandler) + private static RemoteInputChannel addInputChannel(NetworkClientHandler clientHandler) throws IOException { RemoteInputChannel rich = createRemoteInputChannel(); clientHandler.addInputChannel(rich); @@ -386,13 +386,13 @@ class ClientTransportErrorHandlingTest { return rich; } - private NetworkClientHandler getClientHandler(Channel ch) { + private static NetworkClientHandler getClientHandler(Channel ch) { NetworkClientHandler networkClientHandler = ch.pipeline().get(NetworkClientHandler.class); networkClientHandler.setConnectionId(CONNECTION_ID); return networkClientHandler; } - private RemoteInputChannel createRemoteInputChannel() { + private static RemoteInputChannel createRemoteInputChannel() { return when(mock(RemoteInputChannel.class).getInputChannelId()) .thenReturn(new InputChannelID()) .getMock(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java index f46c25ff5a2..0048cde0b13 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java @@ -71,7 +71,7 @@ import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtil import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -191,7 +191,7 @@ class CreditBasedPartitionRequestClientHandlerTest { new NetworkBufferAllocator(handler)); handler.channelRead(mock(ChannelHandlerContext.class), bufferResponse); - assertThat(inputChannel.getNumberOfQueuedBuffers()).isEqualTo(1); + assertThat(inputChannel.getNumberOfQueuedBuffers()).isOne(); assertThat(inputChannel.getSenderBacklog()).isEqualTo(2); } finally { releaseResource(inputGate, networkBufferPool); @@ -307,7 +307,7 @@ class CreditBasedPartitionRequestClientHandlerTest { assertThat(inputChannel.getNumberOfAvailableBuffers()) .as("There should be no buffers available in the channel.") - .isEqualTo(0); + .isZero(); final BufferResponse bufferResponse = createBufferResponse( @@ -471,7 +471,7 @@ class CreditBasedPartitionRequestClientHandlerTest { allocator); handler.channelRead(mock(ChannelHandlerContext.class), bufferResponse3); - assertThat(inputChannels[0].getUnannouncedCredit()).isEqualTo(1); + assertThat(inputChannels[0].getUnannouncedCredit()).isOne(); assertThat(inputChannels[1].getUnannouncedCredit()).isZero(); channel.runPendingTasks(); @@ -488,7 +488,7 @@ class CreditBasedPartitionRequestClientHandlerTest { assertThat(channel.isWritable()).isTrue(); readFromOutbound = channel.readOutbound(); assertThat(readFromOutbound).isInstanceOf(AddCredit.class); - assertThat(((AddCredit) readFromOutbound).credit).isEqualTo(1); + assertThat(((AddCredit) readFromOutbound).credit).isOne(); assertThat(inputChannels[0].getUnannouncedCredit()).isZero(); assertThat(inputChannels[1].getUnannouncedCredit()).isZero(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReaderTest.java index e8a6c707044..30508cba028 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReaderTest.java @@ -24,58 +24,56 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; -import org.junit.Test; +import org.junit.jupiter.api.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link CreditBasedSequenceNumberingViewReader}. */ -public class CreditBasedSequenceNumberingViewReaderTest { +class CreditBasedSequenceNumberingViewReaderTest { @Test - public void testResumeConsumption() throws Exception { + void testResumeConsumption() throws Exception { int numCredits = 2; CreditBasedSequenceNumberingViewReader reader1 = createNetworkSequenceViewReader(numCredits); reader1.resumeConsumption(); - assertEquals(numCredits, reader1.getNumCreditsAvailable()); + assertThat(reader1.getNumCreditsAvailable()).isEqualTo(numCredits); reader1.addCredit(numCredits); reader1.resumeConsumption(); - assertEquals(2 * numCredits, reader1.getNumCreditsAvailable()); + assertThat(reader1.getNumCreditsAvailable()).isEqualTo(2 * numCredits); CreditBasedSequenceNumberingViewReader reader2 = createNetworkSequenceViewReader(0); reader2.addCredit(numCredits); - assertEquals(numCredits, reader2.getNumCreditsAvailable()); + assertThat(reader2.getNumCreditsAvailable()).isEqualTo(numCredits); reader2.resumeConsumption(); - assertEquals(0, reader2.getNumCreditsAvailable()); + assertThat(reader2.getNumCreditsAvailable()).isZero(); } @Test - public void testNeedAnnounceBacklog() throws Exception { + void testNeedAnnounceBacklog() throws Exception { int numCredits = 2; CreditBasedSequenceNumberingViewReader reader1 = createNetworkSequenceViewReader(numCredits); - assertFalse(reader1.needAnnounceBacklog()); + assertThat(reader1.needAnnounceBacklog()).isFalse(); reader1.addCredit(-numCredits); - assertFalse(reader1.needAnnounceBacklog()); + assertThat(reader1.needAnnounceBacklog()).isFalse(); CreditBasedSequenceNumberingViewReader reader2 = createNetworkSequenceViewReader(0); - assertTrue(reader2.needAnnounceBacklog()); + assertThat(reader2.needAnnounceBacklog()).isTrue(); reader2.addCredit(numCredits); - assertFalse(reader2.needAnnounceBacklog()); + assertThat(reader2.needAnnounceBacklog()).isFalse(); reader2.addCredit(-numCredits); - assertTrue(reader2.needAnnounceBacklog()); + assertThat(reader2.needAnnounceBacklog()).isTrue(); } - private CreditBasedSequenceNumberingViewReader createNetworkSequenceViewReader( + private static CreditBasedSequenceNumberingViewReader createNetworkSequenceViewReader( int initialCredit) throws Exception { PartitionRequestQueue queue = new PartitionRequestQueue(); EmbeddedChannel channel = new EmbeddedChannel(queue); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java index 0aaa11ab64c..c66d8a33390 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java @@ -20,22 +20,21 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.junit.After; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.List; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link NettyBufferPool} wrapper. */ -public class NettyBufferPoolTest { +class NettyBufferPoolTest { private final List<ByteBuf> needReleasing = new ArrayList<>(); - @After - public void tearDown() { + @AfterEach + void tearDown() { try { // Release all of the buffers. for (ByteBuf buf : needReleasing) { @@ -44,7 +43,7 @@ public class NettyBufferPoolTest { // Checks in a separate loop in case we have sliced buffers. for (ByteBuf buf : needReleasing) { - assertEquals(0, buf.refCnt()); + assertThat(buf.refCnt()).isZero(); } } finally { needReleasing.clear(); @@ -52,49 +51,49 @@ public class NettyBufferPoolTest { } @Test - public void testNoHeapAllocations() throws Exception { + void testNoHeapAllocations() { final NettyBufferPool nettyBufferPool = new NettyBufferPool(1); // Buffers should prefer to be direct - assertTrue(releaseLater(nettyBufferPool.buffer()).isDirect()); - assertTrue(releaseLater(nettyBufferPool.buffer(128)).isDirect()); - assertTrue(releaseLater(nettyBufferPool.buffer(128, 256)).isDirect()); + assertThat(releaseLater(nettyBufferPool.buffer()).isDirect()).isTrue(); + assertThat(releaseLater(nettyBufferPool.buffer(128)).isDirect()).isTrue(); + assertThat(releaseLater(nettyBufferPool.buffer(128, 256)).isDirect()).isTrue(); // IO buffers should prefer to be direct - assertTrue(releaseLater(nettyBufferPool.ioBuffer()).isDirect()); - assertTrue(releaseLater(nettyBufferPool.ioBuffer(128)).isDirect()); - assertTrue(releaseLater(nettyBufferPool.ioBuffer(128, 256)).isDirect()); + assertThat(releaseLater(nettyBufferPool.ioBuffer()).isDirect()).isTrue(); + assertThat(releaseLater(nettyBufferPool.ioBuffer(128)).isDirect()).isTrue(); + assertThat(releaseLater(nettyBufferPool.ioBuffer(128, 256)).isDirect()).isTrue(); // Currently we fakes the heap buffer allocation with direct buffers - assertTrue(releaseLater(nettyBufferPool.heapBuffer()).isDirect()); - assertTrue(releaseLater(nettyBufferPool.heapBuffer(128)).isDirect()); - assertTrue(releaseLater(nettyBufferPool.heapBuffer(128, 256)).isDirect()); + assertThat(releaseLater(nettyBufferPool.heapBuffer()).isDirect()).isTrue(); + assertThat(releaseLater(nettyBufferPool.heapBuffer(128)).isDirect()).isTrue(); + assertThat(releaseLater(nettyBufferPool.heapBuffer(128, 256)).isDirect()).isTrue(); // Composite buffers allocates the corresponding type of buffers when extending its capacity - assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer()).capacity(1024).isDirect()); - assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer(10)).capacity(1024).isDirect()); + assertThat(releaseLater(nettyBufferPool.compositeHeapBuffer()).capacity(1024).isDirect()) + .isTrue(); + assertThat(releaseLater(nettyBufferPool.compositeHeapBuffer(10)).capacity(1024).isDirect()) + .isTrue(); // Is direct buffer pooled! - assertTrue(nettyBufferPool.isDirectBufferPooled()); + assertThat(nettyBufferPool.isDirectBufferPooled()).isTrue(); } @Test - public void testAllocationsStatistics() throws Exception { + void testAllocationsStatistics() throws Exception { NettyBufferPool nettyBufferPool = new NettyBufferPool(1); int chunkSize = nettyBufferPool.getChunkSize(); { // Single large buffer allocates one chunk releaseLater(nettyBufferPool.directBuffer(chunkSize - 64)); - long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get(); - assertEquals(chunkSize, allocated); + assertThat(nettyBufferPool.getNumberOfAllocatedBytes()).hasValue((long) chunkSize); } { // Allocate a little more (one more chunk required) releaseLater(nettyBufferPool.directBuffer(128)); - long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get(); - assertEquals(2 * chunkSize, allocated); + assertThat(nettyBufferPool.getNumberOfAllocatedBytes()).hasValue(2L * chunkSize); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java index 58ba3e4785a..d0b0104d168 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java @@ -24,8 +24,10 @@ import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient; import org.apache.flink.runtime.net.SSLUtilsTest; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.util.NetUtils; -import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.netty4.io.netty.channel.Channel; import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; @@ -33,13 +35,12 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringDecode import org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringEncoder; import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import javax.net.ssl.SSLSessionContext; +import java.io.IOException; import java.net.InetAddress; import java.time.Duration; import java.util.List; @@ -48,31 +49,29 @@ import static org.apache.flink.configuration.SecurityOptions.SSL_INTERNAL_CLOSE_ import static org.apache.flink.configuration.SecurityOptions.SSL_INTERNAL_HANDSHAKE_TIMEOUT; import static org.apache.flink.configuration.SecurityOptions.SSL_INTERNAL_SESSION_CACHE_SIZE; import static org.apache.flink.configuration.SecurityOptions.SSL_INTERNAL_SESSION_TIMEOUT; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for the SSL connection between Netty Server and Client used for the data plane. */ -@RunWith(Parameterized.class) -public class NettyClientServerSslTest extends TestLogger { +@ExtendWith(ParameterizedTestExtension.class) +class NettyClientServerSslTest { - @Parameterized.Parameter public String sslProvider; + @Parameter private String sslProvider; - @Parameterized.Parameters(name = "SSL provider = {0}") + @Parameters(name = "SSL provider = {0}") public static List<String> parameters() { return SSLUtilsTest.AVAILABLE_SSL_PROVIDERS; } /** Verify valid ssl configuration and connection. */ - @Test - public void testValidSslConnection() throws Exception { + @TestTemplate + void testValidSslConnection() throws Exception { testValidSslConnection(createSslConfig()); } /** Verify valid (advanced) ssl configuration and connection. */ - @Test - public void testValidSslConnectionAdvanced() throws Exception { + @TestTemplate + void testValidSslConnectionAdvanced() throws Exception { Configuration sslConfig = createSslConfig(); sslConfig.setInteger(SSL_INTERNAL_SESSION_CACHE_SIZE, 1); @@ -110,8 +109,9 @@ public class NettyClientServerSslTest extends TestLogger { final NettyClient client = NettyTestUtil.initClient(nettyConfig, protocol, bufferPool); serverAndClient = new NettyServerAndClient(server, client); } - Assert.assertNotNull( - "serverAndClient is null due to fail to get a free port", serverAndClient); + assertThat(serverAndClient) + .withFailMessage("serverAndClient is null due to fail to get a free port") + .isNotNull(); Channel ch = NettyTestUtil.connect(serverAndClient); @@ -132,7 +132,7 @@ public class NettyClientServerSslTest extends TestLogger { // session context is only be available after a session was setup -> this should be true // after data was sent serverChannelInitComplete.await(); - assertNotNull(serverSslHandler[0]); + assertThat(serverSslHandler[0]).isNotNull(); // verify server parameters assertEqualsOrDefault( @@ -145,7 +145,9 @@ public class NettyClientServerSslTest extends TestLogger { serverSslHandler[0].getCloseNotifyFlushTimeoutMillis()); SSLSessionContext sessionContext = serverSslHandler[0].engine().getSession().getSessionContext(); - assertNotNull("bug in unit test setup: session context not available", sessionContext); + assertThat(sessionContext) + .withFailMessage("bug in unit test setup: session context not available") + .isNotNull(); // note: can't verify session cache setting at the client - delegate to server instead (with // our own channel initializer) assertEqualsOrDefault( @@ -153,11 +155,11 @@ public class NettyClientServerSslTest extends TestLogger { int sessionTimeout = sslConfig.getInteger(SSL_INTERNAL_SESSION_TIMEOUT); if (sessionTimeout != -1) { // session timeout config is in milliseconds but the context returns it in seconds - assertEquals(sessionTimeout / 1000, sessionContext.getSessionTimeout()); + assertThat(sessionContext.getSessionTimeout()).isEqualTo(sessionTimeout / 1000); } else { - assertTrue( - "default value (-1) should not be propagated", - sessionContext.getSessionTimeout() >= 0); + assertThat(sessionContext.getSessionTimeout()) + .withFailMessage("default value (-1) should not be propagated") + .isGreaterThanOrEqualTo(0); } NettyTestUtil.shutdown(serverAndClient); @@ -167,16 +169,17 @@ public class NettyClientServerSslTest extends TestLogger { Configuration sslConfig, ConfigOption<Integer> option, long actual) { long expected = sslConfig.getInteger(option); if (expected != option.defaultValue()) { - assertEquals(expected, actual); + assertThat(actual).isEqualTo(expected); } else { - assertTrue( - "default value (" + option.defaultValue() + ") should not be propagated", - actual >= 0); + assertThat(actual) + .withFailMessage( + "default value (%d) should not be propagated", option.defaultValue()) + .isGreaterThanOrEqualTo(0); } } /** Verify failure on invalid ssl configuration. */ - @Test + @TestTemplate public void testInvalidSslConfiguration() throws Exception { NettyProtocol protocol = new NettyTestUtil.NoOpProtocol(); @@ -184,22 +187,18 @@ public class NettyClientServerSslTest extends TestLogger { // Modify the keystore password to an incorrect one config.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, "invalidpassword"); - NettyTestUtil.NettyServerAndClient serverAndClient = null; try (NetUtils.Port port = NetUtils.getAvailablePort()) { NettyConfig nettyConfig = createNettyConfig(config, port); - serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig); - Assert.fail("Created server and client from invalid configuration"); - } catch (Exception e) { - // Exception should be thrown as expected + assertThatThrownBy(() -> NettyTestUtil.initServerAndClient(protocol, nettyConfig)) + .withFailMessage("Created server and client from invalid configuration") + .isInstanceOf(IOException.class); } - - NettyTestUtil.shutdown(serverAndClient); } /** Verify SSL handshake error when untrusted server certificate is used. */ - @Test - public void testSslHandshakeError() throws Exception { + @TestTemplate + void testSslHandshakeError() throws Exception { NettyProtocol protocol = new NettyTestUtil.NoOpProtocol(); Configuration config = createSslConfig(); @@ -214,19 +213,20 @@ public class NettyClientServerSslTest extends TestLogger { serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig); } - Assert.assertNotNull( - "serverAndClient is null due to fail to get a free port", serverAndClient); + assertThat(serverAndClient) + .withFailMessage("serverAndClient is null due to fail to get a free port") + .isNotNull(); Channel ch = NettyTestUtil.connect(serverAndClient); ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder()); // Attempting to write data over ssl should fail - assertFalse(ch.writeAndFlush("test").await().isSuccess()); + assertThat(ch.writeAndFlush("test").await().isSuccess()).isFalse(); NettyTestUtil.shutdown(serverAndClient); } - @Test - public void testClientUntrustedCertificate() throws Exception { + @TestTemplate + void testClientUntrustedCertificate() throws Exception { final Configuration serverConfig = createSslConfig(); final Configuration clientConfig = createSslConfig(); @@ -249,20 +249,21 @@ public class NettyClientServerSslTest extends TestLogger { NettyTestUtil.initClient(nettyClientConfig, protocol, bufferPool); serverAndClient = new NettyServerAndClient(server, client); } - Assert.assertNotNull( - "serverAndClient is null due to fail to get a free port", serverAndClient); + assertThat(serverAndClient) + .withFailMessage("serverAndClient is null due to fail to get a free port") + .isNotNull(); final Channel ch = NettyTestUtil.connect(serverAndClient); ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder()); // Attempting to write data over ssl should fail - assertFalse(ch.writeAndFlush("test").await().isSuccess()); + assertThat(ch.writeAndFlush("test").await().isSuccess()).isFalse(); NettyTestUtil.shutdown(serverAndClient); } - @Test - public void testSslPinningForValidFingerprint() throws Exception { + @TestTemplate + void testSslPinningForValidFingerprint() throws Exception { NettyProtocol protocol = new NettyTestUtil.NoOpProtocol(); Configuration config = createSslConfig(); @@ -277,19 +278,20 @@ public class NettyClientServerSslTest extends TestLogger { serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig); } - Assert.assertNotNull( - "serverAndClient is null due to fail to get a free port", serverAndClient); + assertThat(serverAndClient) + .withFailMessage("serverAndClient is null due to fail to get a free port") + .isNotNull(); Channel ch = NettyTestUtil.connect(serverAndClient); ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder()); - assertTrue(ch.writeAndFlush("test").await().isSuccess()); + assertThat(ch.writeAndFlush("test").await().isSuccess()).isTrue(); NettyTestUtil.shutdown(serverAndClient); } - @Test - public void testSslPinningForInvalidFingerprint() throws Exception { + @TestTemplate + void testSslPinningForInvalidFingerprint() throws Exception { NettyProtocol protocol = new NettyTestUtil.NoOpProtocol(); Configuration config = createSslConfig(); @@ -305,13 +307,14 @@ public class NettyClientServerSslTest extends TestLogger { serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig); } - Assert.assertNotNull( - "serverAndClient is null due to fail to get a free port", serverAndClient); + assertThat(serverAndClient) + .withFailMessage("serverAndClient is null due to fail to get a free port") + .isNotNull(); Channel ch = NettyTestUtil.connect(serverAndClient); ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder()); - assertFalse(ch.writeAndFlush("test").await().isSuccess()); + assertThat(ch.writeAndFlush("test").await().isSuccess()).isFalse(); NettyTestUtil.shutdown(serverAndClient); } @@ -320,8 +323,8 @@ public class NettyClientServerSslTest extends TestLogger { return SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores(sslProvider); } - private static NettyConfig createNettyConfig(Configuration config, NetUtils.Port availablePort) - throws Exception { + private static NettyConfig createNettyConfig( + Configuration config, NetUtils.Port availablePort) { return new NettyConfig( InetAddress.getLoopbackAddress(), availablePort.getPort(), @@ -354,7 +357,7 @@ public class NettyClientServerSslTest extends TestLogger { super.initChannel(channel); SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl"); - assertNotNull(sslHandler); + assertThat(sslHandler).isNotNull(); serverHandler[0] = sslHandler; latch.trigger(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientTest.java index 6e29197ede7..dd3d0cff568 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientTest.java @@ -37,7 +37,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; /** Tests for {@link NettyClient}. */ -public class NettyClientTest { +class NettyClientTest { @Test void testSetKeepaliveOptionWithNioConfigurable() throws Exception { assumeThat(keepaliveForNioConfigurable()).isTrue(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java index f93381db59d..85015690d69 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java @@ -27,23 +27,22 @@ import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.lang.reflect.Field; import java.net.InetAddress; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import static org.assertj.core.api.Assertions.assertThat; /** Simple netty connection manager test. */ -public class NettyConnectionManagerTest { +class NettyConnectionManagerTest { /** * Tests that the number of arenas and number of threads of the client and server are set to the * same number, that is the number of configured task slots. */ @Test - public void testMatchingNumberOfArenasAndThreadsAsDefault() throws Exception { + void testMatchingNumberOfArenasAndThreadsAsDefault() throws Exception { // Expected number of arenas and threads int numberOfSlots = 2; NettyConnectionManager connectionManager; @@ -59,10 +58,11 @@ public class NettyConnectionManagerTest { connectionManager = createNettyConnectionManager(config); connectionManager.start(); } - assertNotNull( - "connectionManager is null due to fail to get a free port", connectionManager); + assertThat(connectionManager) + .withFailMessage("connectionManager is null due to fail to get a free port") + .isNotNull(); - assertEquals(numberOfSlots, connectionManager.getBufferPool().getNumberOfArenas()); + assertThat(connectionManager.getBufferPool().getNumberOfArenas()).isEqualTo(numberOfSlots); { // Client event loop group @@ -73,7 +73,7 @@ public class NettyConnectionManagerTest { f.setAccessible(true); Object[] eventExecutors = (Object[]) f.get(group); - assertEquals(numberOfSlots, eventExecutors.length); + assertThat(eventExecutors).hasSize(numberOfSlots); } { @@ -85,7 +85,7 @@ public class NettyConnectionManagerTest { f.setAccessible(true); Object[] eventExecutors = (Object[]) f.get(group); - assertEquals(numberOfSlots, eventExecutors.length); + assertThat(eventExecutors).hasSize(numberOfSlots); } { @@ -97,13 +97,13 @@ public class NettyConnectionManagerTest { f.setAccessible(true); Object[] eventExecutors = (Object[]) f.get(group); - assertEquals(numberOfSlots, eventExecutors.length); + assertThat(eventExecutors).hasSize(numberOfSlots); } } /** Tests that the number of arenas and threads can be configured manually. */ @Test - public void testManualConfiguration() throws Exception { + void testManualConfiguration() throws Exception { // Expected numbers int numberOfArenas = 1; int numberOfClientThreads = 3; @@ -123,10 +123,12 @@ public class NettyConnectionManagerTest { connectionManager = createNettyConnectionManager(config); connectionManager.start(); - assertEquals(numberOfArenas, connectionManager.getBufferPool().getNumberOfArenas()); + assertThat(connectionManager.getBufferPool().getNumberOfArenas()) + .isEqualTo(numberOfArenas); } - assertNotNull( - "connectionManager is null due to fail to get a free port", connectionManager); + assertThat(connectionManager) + .withFailMessage("connectionManager is null due to fail to get a free port") + .isNotNull(); { // Client event loop group @@ -137,7 +139,7 @@ public class NettyConnectionManagerTest { f.setAccessible(true); Object[] eventExecutors = (Object[]) f.get(group); - assertEquals(numberOfClientThreads, eventExecutors.length); + assertThat(eventExecutors).hasSize(numberOfClientThreads); } { @@ -149,7 +151,7 @@ public class NettyConnectionManagerTest { f.setAccessible(true); Object[] eventExecutors = (Object[]) f.get(group); - assertEquals(numberOfServerThreads, eventExecutors.length); + assertThat(eventExecutors).hasSize(numberOfServerThreads); } { @@ -161,7 +163,7 @@ public class NettyConnectionManagerTest { f.setAccessible(true); Object[] eventExecutors = (Object[]) f.get(group); - assertEquals(numberOfServerThreads, eventExecutors.length); + assertThat(eventExecutors).hasSize(numberOfServerThreads); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java index 801e478b340..0b008d6300f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java @@ -30,14 +30,13 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import javax.annotation.Nullable; @@ -45,16 +44,14 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import static junit.framework.TestCase.assertEquals; -import static junit.framework.TestCase.assertTrue; import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyBufferResponseHeader; import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel; import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; -import static org.junit.Assert.assertNull; +import static org.assertj.core.api.Assertions.assertThat; /** Tests the client side message decoder. */ -public class NettyMessageClientDecoderDelegateTest extends TestLogger { +class NettyMessageClientDecoderDelegateTest { private static final int BUFFER_SIZE = 1024; @@ -72,8 +69,8 @@ public class NettyMessageClientDecoderDelegateTest extends TestLogger { private InputChannelID releasedInputChannelId; - @Before - public void setup() throws IOException, InterruptedException { + @BeforeEach + void setup() throws IOException, InterruptedException { CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); networkBufferPool = new NetworkBufferPool(NUMBER_OF_BUFFER_RESPONSES, BUFFER_SIZE); @@ -97,8 +94,8 @@ public class NettyMessageClientDecoderDelegateTest extends TestLogger { releasedInputChannelId = releasedInputChannel.getInputChannelId(); } - @After - public void tearDown() throws IOException { + @AfterEach + void tearDown() throws IOException { if (inputGate != null) { inputGate.close(); } @@ -115,7 +112,7 @@ public class NettyMessageClientDecoderDelegateTest extends TestLogger { /** Verifies that the client side decoder works well for unreleased input channels. */ @Test - public void testClientMessageDecode() throws Exception { + void testClientMessageDecode() throws Exception { testNettyMessageClientDecoding(false, false, false); } @@ -124,7 +121,7 @@ public class NettyMessageClientDecoderDelegateTest extends TestLogger { * consume data buffers of the input channels. */ @Test - public void testClientMessageDecodeWithEmptyBuffers() throws Exception { + void testClientMessageDecodeWithEmptyBuffers() throws Exception { testNettyMessageClientDecoding(true, false, false); } @@ -133,7 +130,7 @@ public class NettyMessageClientDecoderDelegateTest extends TestLogger { * channel. The data buffer part should be discarded before reading the next message. */ @Test - public void testClientMessageDecodeWithReleasedInputChannel() throws Exception { + void testClientMessageDecodeWithReleasedInputChannel() throws Exception { testNettyMessageClientDecoding(false, true, false); } @@ -142,7 +139,7 @@ public class NettyMessageClientDecoderDelegateTest extends TestLogger { * channel. The data buffer part should be discarded before reading the next message. */ @Test - public void testClientMessageDecodeWithRemovedInputChannel() throws Exception { + void testClientMessageDecodeWithRemovedInputChannel() throws Exception { testNettyMessageClientDecoding(false, false, true); } @@ -318,7 +315,7 @@ public class NettyMessageClientDecoderDelegateTest extends TestLogger { List<NettyMessage> decodedMessages = new ArrayList<>(); Object input; while ((input = channel.readInbound()) != null) { - assertTrue(input instanceof NettyMessage); + assertThat(input).isInstanceOf(NettyMessage.class); decodedMessages.add((NettyMessage) input); } @@ -327,17 +324,17 @@ public class NettyMessageClientDecoderDelegateTest extends TestLogger { private void verifyDecodedMessages( List<BufferResponse> expectedMessages, List<NettyMessage> decodedMessages) { - assertEquals(expectedMessages.size(), decodedMessages.size()); + assertThat(decodedMessages).hasSameSizeAs(expectedMessages); for (int i = 0; i < expectedMessages.size(); ++i) { - assertEquals(expectedMessages.get(i).getClass(), decodedMessages.get(i).getClass()); + assertThat(decodedMessages.get(i)).isInstanceOf(expectedMessages.get(i).getClass()); BufferResponse expected = expectedMessages.get(i); BufferResponse actual = (BufferResponse) decodedMessages.get(i); verifyBufferResponseHeader(expected, actual); if (expected.bufferSize == 0 || !expected.receiverId.equals(inputChannelId)) { - assertNull(actual.getBuffer()); + assertThat(actual.getBuffer()).isNull(); } else { - assertEquals(expected.getBuffer(), actual.getBuffer()); + assertThat(actual.getBuffer()).isEqualTo(expected.getBuffer()); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java index b6927e5c1ec..153836415b3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java @@ -30,14 +30,12 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.util.TestLoggerExtension; import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -60,7 +58,6 @@ import static org.assertj.core.api.Assertions.assertThat; * Tests for the serialization and deserialization of the various {@link NettyMessage} sub-classes * sent from server side to client side. */ -@ExtendWith(TestLoggerExtension.class) class NettyMessageClientSideSerializationTest { private static final int BUFFER_SIZE = 1024; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java index 3aedfa3bd51..57f20e4c61f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java @@ -21,46 +21,45 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.runtime.event.task.IntegerTaskEvent; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; -import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.util.Random; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.encodeAndDecode; -import static org.junit.Assert.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; /** * Tests for the serialization and deserialization of the various {@link NettyMessage} sub-classes * sent from client side to server side. */ -public class NettyMessageServerSideSerializationTest extends TestLogger { +class NettyMessageServerSideSerializationTest { private final Random random = new Random(); private EmbeddedChannel channel; - @Before - public void setup() { + @BeforeEach + void setup() { channel = new EmbeddedChannel( new NettyMessage.NettyMessageEncoder(), // For outbound messages new NettyMessage.NettyMessageDecoder()); // For inbound messages } - @After - public void tearDown() { + @AfterEach + void tearDown() { if (channel != null) { channel.close(); } } @Test - public void testPartitionRequest() { + void testPartitionRequest() { NettyMessage.PartitionRequest expected = new NettyMessage.PartitionRequest( new ResultPartitionID(), @@ -70,14 +69,14 @@ public class NettyMessageServerSideSerializationTest extends TestLogger { NettyMessage.PartitionRequest actual = encodeAndDecode(expected, channel); - assertEquals(expected.partitionId, actual.partitionId); - assertEquals(expected.queueIndex, actual.queueIndex); - assertEquals(expected.receiverId, actual.receiverId); - assertEquals(expected.credit, actual.credit); + assertThat(actual.partitionId).isEqualTo(expected.partitionId); + assertThat(actual.queueIndex).isEqualTo(expected.queueIndex); + assertThat(actual.receiverId).isEqualTo(expected.receiverId); + assertThat(actual.credit).isEqualTo(expected.credit); } @Test - public void testTaskEventRequest() { + void testTaskEventRequest() { NettyMessage.TaskEventRequest expected = new NettyMessage.TaskEventRequest( new IntegerTaskEvent(random.nextInt()), @@ -85,65 +84,65 @@ public class NettyMessageServerSideSerializationTest extends TestLogger { new InputChannelID()); NettyMessage.TaskEventRequest actual = encodeAndDecode(expected, channel); - assertEquals(expected.event, actual.event); - assertEquals(expected.partitionId, actual.partitionId); - assertEquals(expected.receiverId, actual.receiverId); + assertThat(actual.event).isEqualTo(expected.event); + assertThat(actual.partitionId).isEqualTo(expected.partitionId); + assertThat(actual.receiverId).isEqualTo(expected.receiverId); } @Test - public void testCancelPartitionRequest() { + void testCancelPartitionRequest() { NettyMessage.CancelPartitionRequest expected = new NettyMessage.CancelPartitionRequest(new InputChannelID()); NettyMessage.CancelPartitionRequest actual = encodeAndDecode(expected, channel); - assertEquals(expected.receiverId, actual.receiverId); + assertThat(actual.receiverId).isEqualTo(expected.receiverId); } @Test - public void testCloseRequest() { + void testCloseRequest() { NettyMessage.CloseRequest expected = new NettyMessage.CloseRequest(); NettyMessage.CloseRequest actual = encodeAndDecode(expected, channel); - assertEquals(expected.getClass(), actual.getClass()); + assertThat(actual).isExactlyInstanceOf(expected.getClass()); } @Test - public void testAddCredit() { + void testAddCredit() { NettyMessage.AddCredit expected = new NettyMessage.AddCredit( random.nextInt(Integer.MAX_VALUE) + 1, new InputChannelID()); NettyMessage.AddCredit actual = encodeAndDecode(expected, channel); - assertEquals(expected.credit, actual.credit); - assertEquals(expected.receiverId, actual.receiverId); + assertThat(actual.credit).isEqualTo(expected.credit); + assertThat(actual.receiverId).isEqualTo(expected.receiverId); } @Test - public void testResumeConsumption() { + void testResumeConsumption() { NettyMessage.ResumeConsumption expected = new NettyMessage.ResumeConsumption(new InputChannelID()); NettyMessage.ResumeConsumption actual = encodeAndDecode(expected, channel); - assertEquals(expected.receiverId, actual.receiverId); + assertThat(actual.receiverId).isEqualTo(expected.receiverId); } @Test - public void testAckAllUserRecordsProcessed() { + void testAckAllUserRecordsProcessed() { NettyMessage.AckAllUserRecordsProcessed expected = new NettyMessage.AckAllUserRecordsProcessed(new InputChannelID()); NettyMessage.AckAllUserRecordsProcessed actual = encodeAndDecode(expected, channel); - assertEquals(expected.receiverId, actual.receiverId); + assertThat(actual.receiverId).isEqualTo(expected.receiverId); } @Test - public void testNewBufferSize() { + void testNewBufferSize() { NettyMessage.NewBufferSize expected = new NettyMessage.NewBufferSize( random.nextInt(Integer.MAX_VALUE), new InputChannelID()); NettyMessage.NewBufferSize actual = encodeAndDecode(expected, channel); - assertEquals(expected.bufferSize, actual.bufferSize); - assertEquals(expected.receiverId, actual.receiverId); + assertThat(actual.bufferSize).isEqualTo(expected.bufferSize); + assertThat(actual.receiverId).isEqualTo(expected.receiverId); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java index ccfef5f2442..191deba182a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java @@ -30,13 +30,15 @@ import org.apache.flink.runtime.io.network.netty.NettyMessage.ResumeConsumption; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.shaded.netty4.io.netty.channel.Channel; import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -44,25 +46,20 @@ import java.net.InetSocketAddress; import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel; import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.mockConnectionManagerWithPartitionRequestClient; -import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link NettyPartitionRequestClient}. */ -@RunWith(Parameterized.class) -public class NettyPartitionRequestClientTest { - @Parameterized.Parameter public boolean connectionReuseEnabled; +@ExtendWith(ParameterizedTestExtension.class) +class NettyPartitionRequestClientTest { + @Parameter private boolean connectionReuseEnabled; - @Parameterized.Parameters(name = "connection reuse enabled = {0}") - public static Object[] parameters() { + @Parameters(name = "connection reuse enabled = {0}") + private static Object[] parameters() { return new Object[][] {new Object[] {true}, new Object[] {false}}; } - @Test - public void testPartitionRequestClientReuse() throws Exception { + @TestTemplate + void testPartitionRequestClientReuse() throws Exception { final CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); final EmbeddedChannel channel = new EmbeddedChannel(handler); @@ -76,11 +73,11 @@ public class NettyPartitionRequestClientTest { try { // Client should not be disposed in idle client.close(inputChannel); - assertFalse(client.canBeDisposed()); + assertThat(client.canBeDisposed()).isFalse(); // Client should be disposed in error handler.notifyAllChannelsOfErrorAndClose(new RuntimeException()); - assertTrue(client.canBeDisposed()); + assertThat(client.canBeDisposed()).isTrue(); } finally { // Release all the buffer resources inputGate.close(); @@ -90,8 +87,8 @@ public class NettyPartitionRequestClientTest { } } - @Test - public void testRetriggerPartitionRequest() throws Exception { + @TestTemplate + void testRetriggerPartitionRequest() throws Exception { final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs final CreditBasedPartitionRequestClientHandler handler = @@ -120,13 +117,12 @@ public class NettyPartitionRequestClientTest { // first subpartition request inputChannel.requestSubpartition(); - assertTrue(channel.isWritable()); + assertThat(channel.isWritable()).isTrue(); Object readFromOutbound = channel.readOutbound(); - assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); - assertEquals( - inputChannel.getInputChannelId(), - ((PartitionRequest) readFromOutbound).receiverId); - assertEquals(numExclusiveBuffers, ((PartitionRequest) readFromOutbound).credit); + assertThat(readFromOutbound).isInstanceOf(PartitionRequest.class); + assertThat(((PartitionRequest) readFromOutbound).receiverId) + .isEqualTo(inputChannel.getInputChannelId()); + assertThat(((PartitionRequest) readFromOutbound).credit).isEqualTo(numExclusiveBuffers); // retrigger subpartition request, e.g. due to failures inputGate.retriggerPartitionRequest( @@ -135,11 +131,10 @@ public class NettyPartitionRequestClientTest { runAllScheduledPendingTasks(channel, deadline); readFromOutbound = channel.readOutbound(); - assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); - assertEquals( - inputChannel.getInputChannelId(), - ((PartitionRequest) readFromOutbound).receiverId); - assertEquals(numExclusiveBuffers, ((PartitionRequest) readFromOutbound).credit); + assertThat(readFromOutbound).isInstanceOf(PartitionRequest.class); + assertThat(((PartitionRequest) readFromOutbound).receiverId) + .isEqualTo(inputChannel.getInputChannelId()); + assertThat(((PartitionRequest) readFromOutbound).credit).isEqualTo(numExclusiveBuffers); // retrigger subpartition request once again, e.g. due to failures inputGate.retriggerPartitionRequest( @@ -148,13 +143,12 @@ public class NettyPartitionRequestClientTest { runAllScheduledPendingTasks(channel, deadline); readFromOutbound = channel.readOutbound(); - assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); - assertEquals( - inputChannel.getInputChannelId(), - ((PartitionRequest) readFromOutbound).receiverId); - assertEquals(numExclusiveBuffers, ((PartitionRequest) readFromOutbound).credit); + assertThat(readFromOutbound).isInstanceOf(PartitionRequest.class); + assertThat(((PartitionRequest) readFromOutbound).receiverId) + .isEqualTo(inputChannel.getInputChannelId()); + assertThat(((PartitionRequest) readFromOutbound).credit).isEqualTo(numExclusiveBuffers); - assertNull(channel.readOutbound()); + assertThat((Object) channel.readOutbound()).isNull(); } finally { // Release all the buffer resources inputGate.close(); @@ -164,8 +158,8 @@ public class NettyPartitionRequestClientTest { } } - @Test - public void testDoublePartitionRequest() throws Exception { + @TestTemplate + void testDoublePartitionRequest() throws Exception { final CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); final EmbeddedChannel channel = new EmbeddedChannel(handler); @@ -185,15 +179,14 @@ public class NettyPartitionRequestClientTest { inputChannel.requestSubpartition(); // The input channel should only send one partition request - assertTrue(channel.isWritable()); + assertThat(channel.isWritable()).isTrue(); Object readFromOutbound = channel.readOutbound(); - assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); - assertEquals( - inputChannel.getInputChannelId(), - ((PartitionRequest) readFromOutbound).receiverId); - assertEquals(numExclusiveBuffers, ((PartitionRequest) readFromOutbound).credit); + assertThat(readFromOutbound).isInstanceOf(PartitionRequest.class); + assertThat(((PartitionRequest) readFromOutbound).receiverId) + .isEqualTo(inputChannel.getInputChannelId()); + assertThat(((PartitionRequest) readFromOutbound).credit).isEqualTo(numExclusiveBuffers); - assertNull(channel.readOutbound()); + assertThat((Object) channel.readOutbound()).isNull(); } finally { // Release all the buffer resources inputGate.close(); @@ -203,8 +196,8 @@ public class NettyPartitionRequestClientTest { } } - @Test - public void testResumeConsumption() throws Exception { + @TestTemplate + void testResumeConsumption() throws Exception { final CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); final EmbeddedChannel channel = new EmbeddedChannel(handler); @@ -224,15 +217,14 @@ public class NettyPartitionRequestClientTest { inputChannel.resumeConsumption(); channel.runPendingTasks(); Object readFromOutbound = channel.readOutbound(); - assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); + assertThat(readFromOutbound).isInstanceOf(PartitionRequest.class); readFromOutbound = channel.readOutbound(); - assertThat(readFromOutbound, instanceOf(ResumeConsumption.class)); - assertEquals( - inputChannel.getInputChannelId(), - ((ResumeConsumption) readFromOutbound).receiverId); + assertThat(readFromOutbound).isInstanceOf(ResumeConsumption.class); + assertThat(((ResumeConsumption) readFromOutbound).receiverId) + .isEqualTo(inputChannel.getInputChannelId()); - assertNull(channel.readOutbound()); + assertThat((Object) channel.readOutbound()).isNull(); } finally { // Release all the buffer resources inputGate.close(); @@ -242,8 +234,8 @@ public class NettyPartitionRequestClientTest { } } - @Test - public void testAcknowledgeAllRecordsProcessed() throws Exception { + @TestTemplate + void testAcknowledgeAllRecordsProcessed() throws Exception { CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); EmbeddedChannel channel = new EmbeddedChannel(handler); @@ -263,15 +255,15 @@ public class NettyPartitionRequestClientTest { inputChannel.acknowledgeAllRecordsProcessed(); channel.runPendingTasks(); Object readFromOutbound = channel.readOutbound(); - assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); + assertThat(readFromOutbound).isInstanceOf(PartitionRequest.class); readFromOutbound = channel.readOutbound(); - assertThat(readFromOutbound, instanceOf(NettyMessage.AckAllUserRecordsProcessed.class)); - assertEquals( - inputChannel.getInputChannelId(), - ((NettyMessage.AckAllUserRecordsProcessed) readFromOutbound).receiverId); + assertThat(readFromOutbound) + .isInstanceOf(NettyMessage.AckAllUserRecordsProcessed.class); + assertThat(((NettyMessage.AckAllUserRecordsProcessed) readFromOutbound).receiverId) + .isEqualTo(inputChannel.getInputChannelId()); - assertNull(channel.readOutbound()); + assertThat((Object) channel.readOutbound()).isNull(); } finally { // Release all the buffer resources inputGate.close(); @@ -281,7 +273,7 @@ public class NettyPartitionRequestClientTest { } } - private NettyPartitionRequestClient createPartitionRequestClient( + private static NettyPartitionRequestClient createPartitionRequestClient( Channel tcpChannel, NetworkClientHandler clientHandler, boolean connectionReuseEnabled) throws Exception { ConnectionID connectionID = @@ -304,7 +296,7 @@ public class NettyPartitionRequestClientTest { * @param deadline maximum timestamp in ms to stop waiting further * @throws InterruptedException */ - void runAllScheduledPendingTasks(EmbeddedChannel channel, long deadline) + private static void runAllScheduledPendingTasks(EmbeddedChannel channel, long deadline) throws InterruptedException { // NOTE: we don't have to be super fancy here; busy-polling with 1ms delays is enough while (channel.runScheduledPendingTasks() != -1 && System.currentTimeMillis() < deadline) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerFromPortRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerFromPortRangeTest.java index ca27b8bd6e0..6b78aee13a1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerFromPortRangeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerFromPortRangeTest.java @@ -54,7 +54,7 @@ class NettyServerFromPortRangeTest { assertThat(listeningPort2).isEqualTo(port2.getPort()); } - private NettyConfig getConfig(NetUtils.Port... ports) { + private static NettyConfig getConfig(NetUtils.Port... ports) { String portRangeStr = Arrays.stream(ports) .map(NetUtils.Port::getPort) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java index 42dc38c8e30..8605bcd1498 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java @@ -34,13 +34,12 @@ import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import static junit.framework.TestCase.assertEquals; import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; /** Test utility for Netty server and client setup. */ public class NettyTestUtil { @@ -198,7 +197,7 @@ public class NettyTestUtil { while ((encoded = channel.readOutbound()) != null) { msgNotEmpty = channel.writeInbound(encoded); } - assertTrue(msgNotEmpty); + assertThat(msgNotEmpty).isTrue(); return channel.readInbound(); } @@ -208,20 +207,20 @@ public class NettyTestUtil { // --------------------------------------------------------------------------------------------- static void verifyErrorResponse(ErrorResponse expected, ErrorResponse actual) { - assertEquals(expected.receiverId, actual.receiverId); - assertEquals(expected.cause.getClass(), actual.cause.getClass()); - assertEquals(expected.cause.getMessage(), actual.cause.getMessage()); + assertThat(actual.receiverId).isEqualTo(expected.receiverId); + assertThat(expected.cause).hasSameClassAs(actual.cause); + assertThat(expected.cause.getMessage()).isEqualTo(actual.cause.getMessage()); if (expected.receiverId == null) { - assertTrue(actual.isFatalError()); + assertThat(actual.isFatalError()).isTrue(); } } static void verifyBufferResponseHeader(BufferResponse expected, BufferResponse actual) { - assertEquals(expected.backlog, actual.backlog); - assertEquals(expected.sequenceNumber, actual.sequenceNumber); - assertEquals(expected.bufferSize, actual.bufferSize); - assertEquals(expected.receiverId, actual.receiverId); + assertThat(expected.backlog).isEqualTo(actual.backlog); + assertThat(expected.sequenceNumber).isEqualTo(actual.sequenceNumber); + assertThat(expected.bufferSize).isEqualTo(actual.bufferSize); + assertThat(expected.receiverId).isEqualTo(actual.receiverId); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java index d923d76a6c9..15c5e435e61 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java @@ -22,10 +22,10 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.NetworkClientHandler; import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException; +import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; -import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.netty4.io.netty.channel.Channel; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException; @@ -36,6 +36,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdap import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; import java.io.IOException; import java.net.InetAddress; @@ -47,26 +48,30 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; /** {@link PartitionRequestClientFactory} test. */ @ExtendWith(ParameterizedTestExtension.class) -public class PartitionRequestClientFactoryTest extends TestLogger { +class PartitionRequestClientFactoryTest { private static final ResourceID RESOURCE_ID = ResourceID.generate(); - @Parameter public boolean connectionReuseEnabled; + @RegisterExtension + private static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = + new TestExecutorExtension<>(() -> Executors.newFixedThreadPool(10)); + + @Parameter private boolean connectionReuseEnabled; @Parameters(name = "connectionReuseEnabled={0}") - public static Collection<Boolean> parameters() { + private static Collection<Boolean> parameters() { return Arrays.asList(false, true); } @@ -163,7 +168,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger { RESOURCE_ID, (int) (Math.random() * Integer.MAX_VALUE)); set.add(factory.createPartitionRequestClient(connectionID)); } - assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections); + assertThat(set).hasSizeLessThanOrEqualTo(maxNumberOfConnections); } /** @@ -288,45 +293,31 @@ public class PartitionRequestClientFactoryTest extends TestLogger { new PartitionRequestClientFactory( unstableNettyClient, 2, 1, connectionReuseEnabled); - ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(10); - List<Future<NettyPartitionRequestClient>> futures = new ArrayList<>(); + List<CompletableFuture<NettyPartitionRequestClient>> futures = new ArrayList<>(); for (int i = 0; i < 10; i++) { - Future<NettyPartitionRequestClient> future = - threadPoolExecutor.submit( + futures.add( + CompletableFuture.supplyAsync( () -> { - NettyPartitionRequestClient client = null; try { - client = - factory.createPartitionRequestClient( - serverAndClient.getConnectionID( - RESOURCE_ID, 0)); + return factory.createPartitionRequestClient( + serverAndClient.getConnectionID(RESOURCE_ID, 0)); } catch (Exception e) { - fail(e.getMessage()); + throw new CompletionException(e); } - return client; - }); - - futures.add(future); + }, + EXECUTOR_EXTENSION.getExecutor())); } futures.forEach( - runnableFuture -> { - NettyPartitionRequestClient client; - try { - client = runnableFuture.get(); - assertThat(client).isNotNull(); - } catch (Exception e) { - System.out.println(e.getMessage()); - fail(); - } - }); + runnableFuture -> + assertThatFuture(runnableFuture).eventuallySucceeds().isNotNull()); - threadPoolExecutor.shutdown(); shutdown(serverAndClient); } - private NettyTestUtil.NettyServerAndClient createNettyServerAndClient() throws Exception { + private static NettyTestUtil.NettyServerAndClient createNettyServerAndClient() + throws Exception { return NettyTestUtil.initServerAndClient( new NettyProtocol(null, null) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java index 1a16710a811..e113f817d4b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java @@ -46,48 +46,38 @@ import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import javax.annotation.Nullable; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer; import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link PartitionRequestQueue}. */ -public class PartitionRequestQueueTest { - - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); +class PartitionRequestQueueTest { private static final int BUFFER_SIZE = 1024 * 1024; private static FileChannelManager fileChannelManager; - @BeforeClass - public static void setUp() throws Exception { + @BeforeAll + static void setUp(@TempDir File temporaryFolder) { fileChannelManager = new FileChannelManagerImpl( - new String[] {TEMPORARY_FOLDER.newFolder().getAbsolutePath()}, "testing"); + new String[] {temporaryFolder.getAbsolutePath()}, "testing"); } - @AfterClass - public static void shutdown() throws Exception { + @AfterAll + static void shutdown() throws Exception { fileChannelManager.close(); } @@ -103,13 +93,12 @@ public class PartitionRequestQueueTest { reader.requestSubpartitionViewOrRegisterListener( resultPartitionManager, resultPartitionId, 0); - assertEquals( - resultPartitionManager - .getListenerManagers() - .get(resultPartitionId) - .getPartitionRequestListeners() - .size(), - 1); + assertThat( + resultPartitionManager + .getListenerManagers() + .get(resultPartitionId) + .getPartitionRequestListeners()) + .hasSize(1); reader.notifyPartitionRequestTimeout( resultPartitionManager @@ -122,11 +111,12 @@ public class PartitionRequestQueueTest { channel.runPendingTasks(); Object read = channel.readOutbound(); - assertNotNull(read); - assertThat(read, instanceOf(NettyMessage.ErrorResponse.class)); - assertThat( - ((NettyMessage.ErrorResponse) read).cause, - instanceOf(PartitionNotFoundException.class)); + assertThat(read) + .isNotNull() + .isInstanceOf(NettyMessage.ErrorResponse.class) + .isInstanceOfSatisfying( + NettyMessage.ErrorResponse.class, + r -> assertThat(r.cause).isInstanceOf(PartitionNotFoundException.class)); } /** @@ -135,7 +125,7 @@ public class PartitionRequestQueueTest { * messages. */ @Test - public void testNotifyReaderNonEmptyOnEmptyReaders() throws Exception { + void testNotifyReaderNonEmptyOnEmptyReaders() throws Exception { final int buffersToWrite = 5; PartitionRequestQueue queue = new PartitionRequestQueue(); EmbeddedChannel channel = new EmbeddedChannel(queue); @@ -153,11 +143,11 @@ public class PartitionRequestQueueTest { .build(), 0); reader1.notifyDataAvailable(); - assertTrue(reader1.getAvailabilityAndBacklog().isAvailable()); - assertFalse(reader1.isRegisteredAsAvailable()); + assertThat(reader1.getAvailabilityAndBacklog().isAvailable()).isTrue(); + assertThat(reader1.isRegisteredAsAvailable()).isFalse(); channel.unsafe().outboundBuffer().setUserDefinedWritability(1, false); - assertFalse(channel.isWritable()); + assertThat(channel.isWritable()).isFalse(); reader1.notifyDataAvailable(); channel.runPendingTasks(); @@ -170,26 +160,26 @@ public class PartitionRequestQueueTest { new DefaultBufferResultSubpartitionView(buffersToWrite)) .build(), 0); - assertTrue(reader2.getAvailabilityAndBacklog().isAvailable()); - assertFalse(reader2.isRegisteredAsAvailable()); + assertThat(reader2.getAvailabilityAndBacklog().isAvailable()).isTrue(); + assertThat(reader2.isRegisteredAsAvailable()).isFalse(); reader2.notifyDataAvailable(); // changing a channel writability should result in draining both reader1 and reader2 channel.unsafe().outboundBuffer().setUserDefinedWritability(1, true); channel.runPendingTasks(); - assertEquals(buffersToWrite, channel.outboundMessages().size()); + assertThat(channel.outboundMessages()).hasSize(buffersToWrite); } /** Tests {@link PartitionRequestQueue} buffer writing with default buffers. */ @Test - public void testDefaultBufferWriting() throws Exception { + void testDefaultBufferWriting() throws Exception { testBufferWriting(new DefaultBufferResultSubpartitionView(1)); } /** Tests {@link PartitionRequestQueue} buffer writing with read-only buffers. */ @Test - public void testReadOnlyBufferWriting() throws Exception { + void testReadOnlyBufferWriting() throws Exception { testBufferWriting(new ReadOnlyBufferResultSubpartitionView(1)); } @@ -214,13 +204,13 @@ public class PartitionRequestQueueTest { channel.runPendingTasks(); Object read = channel.readOutbound(); - assertNotNull(read); + assertThat(read).isNotNull(); if (read instanceof NettyMessage.ErrorResponse) { ((NettyMessage.ErrorResponse) read).cause.printStackTrace(); } - assertThat(read, instanceOf(NettyMessage.BufferResponse.class)); + assertThat(read).isInstanceOf(NettyMessage.BufferResponse.class); read = channel.readOutbound(); - assertNull(read); + assertThat(read).isNull(); } private static class DefaultBufferResultSubpartitionView extends NoOpResultSubpartitionView { @@ -287,7 +277,7 @@ public class PartitionRequestQueueTest { * even though it has no available credits. */ @Test - public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + void testEnqueueReaderByNotifyingEventBuffer() throws Exception { // setup final ResultSubpartitionView view = new NextIsEventResultSubpartitionView(); @@ -306,7 +296,7 @@ public class PartitionRequestQueueTest { // block the channel so that we see an intermediate state in the test ByteBuf channelBlockingBuffer = blockChannel(channel); - assertNull(channel.readOutbound()); + assertThat((Object) channel.readOutbound()).isNull(); // Notify an available event buffer to trigger enqueue the reader reader.notifyDataAvailable(); @@ -315,16 +305,16 @@ public class PartitionRequestQueueTest { // The reader is enqueued in the pipeline because the next buffer is an event, even though // no credits are available - assertThat(queue.getAvailableReaders(), contains(reader)); // contains only (this) one! - assertEquals(0, reader.getNumCreditsAvailable()); + assertThat(queue.getAvailableReaders()).contains(reader); // contains only (this) one! + assertThat(reader.getNumCreditsAvailable()).isZero(); // Flush the buffer to make the channel writable again and see the final results channel.flush(); - assertSame(channelBlockingBuffer, channel.readOutbound()); + assertThat((ByteBuf) channel.readOutbound()).isSameAs(channelBlockingBuffer); - assertEquals(0, queue.getAvailableReaders().size()); - assertEquals(0, reader.getNumCreditsAvailable()); - assertNull(channel.readOutbound()); + assertThat(queue.getAvailableReaders()).isEmpty(); + assertThat(reader.getNumCreditsAvailable()).isZero(); + assertThat((Object) channel.readOutbound()).isNull(); } private static class NextIsEventResultSubpartitionView extends NoOpResultSubpartitionView { @@ -340,7 +330,7 @@ public class PartitionRequestQueueTest { * buffers. */ @Test - public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { + void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { // setup final ResultSubpartitionView view = new DefaultBufferResultSubpartitionView(10); @@ -361,7 +351,7 @@ public class PartitionRequestQueueTest { // block the channel so that we see an intermediate state in the test ByteBuf channelBlockingBuffer = blockChannel(channel); - assertNull(channel.readOutbound()); + assertThat((Object) channel.readOutbound()).isNull(); // Notify available buffers to trigger enqueue the reader final int notifyNumBuffers = 5; @@ -373,10 +363,10 @@ public class PartitionRequestQueueTest { // the reader is not enqueued in the pipeline because no credits are available // -> it should still have the same number of pending buffers - assertEquals(0, queue.getAvailableReaders().size()); - assertTrue(reader.hasBuffersAvailable().isAvailable()); - assertFalse(reader.isRegisteredAsAvailable()); - assertEquals(0, reader.getNumCreditsAvailable()); + assertThat(queue.getAvailableReaders()).isEmpty(); + assertThat(reader.hasBuffersAvailable().isAvailable()).isTrue(); + assertThat(reader.isRegisteredAsAvailable()).isFalse(); + assertThat(reader.getNumCreditsAvailable()).isZero(); // Notify available credits to trigger enqueue the reader again final int notifyNumCredits = 3; @@ -388,24 +378,25 @@ public class PartitionRequestQueueTest { // since the channel is blocked though, we will not process anything and only enqueue // the // reader once - assertTrue(reader.isRegisteredAsAvailable()); - assertThat(queue.getAvailableReaders(), contains(reader)); // contains only (this) one! - assertEquals(i, reader.getNumCreditsAvailable()); - assertTrue(reader.hasBuffersAvailable().isAvailable()); + assertThat(reader.isRegisteredAsAvailable()).isTrue(); + assertThat(queue.getAvailableReaders()).contains(reader); // contains only (this) one! + assertThat(reader.getNumCreditsAvailable()).isEqualTo(i); + assertThat(reader.hasBuffersAvailable().isAvailable()).isTrue(); } // Flush the buffer to make the channel writable again and see the final results channel.flush(); - assertSame(channelBlockingBuffer, channel.readOutbound()); + assertThat((ByteBuf) channel.readOutbound()).isSameAs(channelBlockingBuffer); - assertEquals(0, queue.getAvailableReaders().size()); - assertEquals(0, reader.getNumCreditsAvailable()); - assertTrue(reader.hasBuffersAvailable().isAvailable()); - assertFalse(reader.isRegisteredAsAvailable()); + assertThat(queue.getAvailableReaders()).isEmpty(); + assertThat(reader.getNumCreditsAvailable()).isZero(); + assertThat(reader.hasBuffersAvailable().isAvailable()).isTrue(); + assertThat(reader.isRegisteredAsAvailable()).isFalse(); for (int i = 1; i <= notifyNumCredits; i++) { - assertThat(channel.readOutbound(), instanceOf(NettyMessage.BufferResponse.class)); + assertThat((Object) channel.readOutbound()) + .isInstanceOf(NettyMessage.BufferResponse.class); } - assertNull(channel.readOutbound()); + assertThat((Object) channel.readOutbound()).isNull(); } /** @@ -414,7 +405,7 @@ public class PartitionRequestQueueTest { * there are credit and data available. */ @Test - public void testEnqueueReaderByResumingConsumption() throws Exception { + void testEnqueueReaderByResumingConsumption() throws Exception { PipelinedSubpartition subpartition = PipelinedSubpartitionTest.createPipelinedSubpartition(); Buffer.DataType dataType1 = Buffer.DataType.ALIGNED_CHECKPOINT_BARRIER; @@ -437,26 +428,26 @@ public class PartitionRequestQueueTest { reader.notifySubpartitionCreated(partition, 0); queue.notifyReaderCreated(reader); - assertTrue(reader.getAvailabilityAndBacklog().isAvailable()); + assertThat(reader.getAvailabilityAndBacklog().isAvailable()).isTrue(); reader.notifyDataAvailable(); channel.runPendingTasks(); - assertFalse(reader.getAvailabilityAndBacklog().isAvailable()); - assertEquals(1, subpartition.unsynchronizedGetNumberOfQueuedBuffers()); + assertThat(reader.getAvailabilityAndBacklog().isAvailable()).isFalse(); + assertThat(subpartition.unsynchronizedGetNumberOfQueuedBuffers()).isOne(); queue.addCreditOrResumeConsumption( receiverId, NetworkSequenceViewReader::resumeConsumption); - assertFalse(reader.getAvailabilityAndBacklog().isAvailable()); - assertEquals(0, subpartition.unsynchronizedGetNumberOfQueuedBuffers()); + assertThat(reader.getAvailabilityAndBacklog().isAvailable()).isFalse(); + assertThat(subpartition.unsynchronizedGetNumberOfQueuedBuffers()).isZero(); Object data1 = channel.readOutbound(); - assertEquals(dataType1, ((NettyMessage.BufferResponse) data1).buffer.getDataType()); + assertThat(((NettyMessage.BufferResponse) data1).buffer.getDataType()).isEqualTo(dataType1); Object data2 = channel.readOutbound(); - assertEquals(dataType2, ((NettyMessage.BufferResponse) data2).buffer.getDataType()); + assertThat(((NettyMessage.BufferResponse) data2).buffer.getDataType()).isEqualTo(dataType2); } @Test - public void testAnnounceBacklog() throws Exception { + void testAnnounceBacklog() throws Exception { PipelinedSubpartition subpartition = PipelinedSubpartitionTest.createPipelinedSubpartition(); subpartition.add(createEventBufferConsumer(4096, Buffer.DataType.DATA_BUFFER)); @@ -481,24 +472,24 @@ public class PartitionRequestQueueTest { reader.notifyDataAvailable(); channel.runPendingTasks(); Object data = channel.readOutbound(); - assertTrue(data instanceof NettyMessage.BacklogAnnouncement); + assertThat(data).isInstanceOf(NettyMessage.BacklogAnnouncement.class); NettyMessage.BacklogAnnouncement announcement = (NettyMessage.BacklogAnnouncement) data; - assertEquals(receiverId, announcement.receiverId); - assertEquals(subpartition.getBuffersInBacklogUnsafe(), announcement.backlog); + assertThat(announcement.receiverId).isEqualTo(receiverId); + assertThat(announcement.backlog).isEqualTo(subpartition.getBuffersInBacklogUnsafe()); subpartition.release(); reader.notifyDataAvailable(); channel.runPendingTasks(); - assertNotNull(channel.readOutbound()); + assertThat((Object) channel.readOutbound()).isNotNull(); } @Test - public void testCancelPartitionRequestForUnavailableView() throws Exception { + void testCancelPartitionRequestForUnavailableView() throws Exception { testCancelPartitionRequest(false); } @Test - public void testCancelPartitionRequestForAvailableView() throws Exception { + void testCancelPartitionRequestForAvailableView() throws Exception { testCancelPartitionRequest(true); } @@ -522,18 +513,18 @@ public class PartitionRequestQueueTest { // add credit to make this reader available for adding into availableReaders queue if (isAvailableView) { queue.addCreditOrResumeConsumption(receiverId, viewReader -> viewReader.addCredit(1)); - assertTrue(queue.getAvailableReaders().contains(reader)); + assertThat(queue.getAvailableReaders()).contains(reader); } // cancel this subpartition view queue.cancel(receiverId); channel.runPendingTasks(); - assertFalse(queue.getAvailableReaders().contains(reader)); + assertThat(queue.getAvailableReaders()).doesNotContain(reader); // the reader view should be released (the partition is not, though, blocking partitions // support multiple successive readers for recovery and caching) - assertTrue(reader.isReleased()); + assertThat(reader.isReleased()).isTrue(); // cleanup partition.release(); @@ -541,7 +532,7 @@ public class PartitionRequestQueueTest { } @Test - public void testNotifyNewBufferSize() throws Exception { + void testNotifyNewBufferSize() throws Exception { // given: Result partition and the reader for subpartition 0. ResultPartition parent = createResultPartition(); @@ -575,10 +566,10 @@ public class PartitionRequestQueueTest { // then: Buffers of received size will be in outbound channel. Object data1 = channel.readOutbound(); // The size can not be less than the first record in buffer. - assertEquals(128, ((NettyMessage.BufferResponse) data1).buffer.getSize()); + assertThat(((NettyMessage.BufferResponse) data1).buffer.getSize()).isEqualTo(128); Object data2 = channel.readOutbound(); // The size should shrink up to notified buffer size. - assertEquals(65, ((NettyMessage.BufferResponse) data2).buffer.getSize()); + assertThat(((NettyMessage.BufferResponse) data2).buffer.getSize()).isEqualTo(65); } private static ResultPartition createResultPartition() throws IOException { @@ -629,7 +620,7 @@ public class PartitionRequestQueueTest { // to the wire although the buffer is "empty". ByteBuf channelBlockingBuffer = Unpooled.buffer(highWaterMark).writerIndex(highWaterMark); channel.write(channelBlockingBuffer); - assertFalse(channel.isWritable()); + assertThat(channel.isWritable()).isFalse(); return channelBlockingBuffer; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandlerTest.java index d7941ef352d..edc349b82ed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandlerTest.java @@ -27,24 +27,21 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; -import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.concurrent.CompletableFuture; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link PartitionRequestServerHandler}. */ -public class PartitionRequestServerHandlerTest extends TestLogger { +class PartitionRequestServerHandlerTest { @Test - public void testResumeConsumption() { + void testResumeConsumption() { final InputChannelID inputChannelID = new InputChannelID(); final PartitionRequestQueue partitionRequestQueue = new PartitionRequestQueue(); final TestViewReader testViewReader = @@ -61,11 +58,11 @@ public class PartitionRequestServerHandlerTest extends TestLogger { channel.writeInbound(new ResumeConsumption(inputChannelID)); channel.runPendingTasks(); - assertTrue(testViewReader.consumptionResumed); + assertThat(testViewReader.consumptionResumed).isTrue(); } @Test - public void testAcknowledgeAllRecordsProcessed() throws IOException { + void testAcknowledgeAllRecordsProcessed() throws IOException { InputChannelID inputChannelID = new InputChannelID(); ResultPartition resultPartition = @@ -91,12 +88,11 @@ public class PartitionRequestServerHandlerTest extends TestLogger { resultPartition.notifyEndOfData(StopMode.DRAIN); CompletableFuture<Void> allRecordsProcessedFuture = resultPartition.getAllDataProcessedFuture(); - assertFalse(allRecordsProcessedFuture.isDone()); + assertThat(allRecordsProcessedFuture).isNotDone(); channel.writeInbound(new NettyMessage.AckAllUserRecordsProcessed(inputChannelID)); channel.runPendingTasks(); - assertTrue(allRecordsProcessedFuture.isDone()); - assertFalse(allRecordsProcessedFuture.isCompletedExceptionally()); + assertThat(allRecordsProcessedFuture).isDone().isNotCompletedExceptionally(); } @Test @@ -117,11 +113,11 @@ public class PartitionRequestServerHandlerTest extends TestLogger { channel.writeInbound(new NettyMessage.NewBufferSize(666, inputChannelID)); channel.runPendingTasks(); - assertEquals(666, testViewReader.bufferSize); + assertThat(testViewReader.bufferSize).isEqualTo(666); } @Test - public void testReceivingNewBufferSizeBeforeReaderIsCreated() { + void testReceivingNewBufferSizeBeforeReaderIsCreated() { final InputChannelID inputChannelID = new InputChannelID(); final PartitionRequestQueue partitionRequestQueue = new PartitionRequestQueue(); final TestViewReader testViewReader = @@ -138,10 +134,12 @@ public class PartitionRequestServerHandlerTest extends TestLogger { channel.runPendingTasks(); // If error happens outbound messages would be not empty. - assertTrue(channel.outboundMessages().toString(), channel.outboundMessages().isEmpty()); + assertThat(channel.outboundMessages()) + .withFailMessage(channel.outboundMessages().toString()) + .isEmpty(); // New buffer size should be silently ignored because it is possible situation. - assertEquals(-1, testViewReader.bufferSize); + assertThat(testViewReader.bufferSize).isEqualTo(-1); } private static class TestViewReader extends CreditBasedSequenceNumberingViewReader { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java index 78bfa73e600..f5fdaf7a1b3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java @@ -33,7 +33,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; -import org.junit.Test; +import org.junit.jupiter.api.Test; import org.mockito.stubbing.Answer; import java.util.Optional; @@ -43,17 +43,17 @@ import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class ServerTransportErrorHandlingTest { +class ServerTransportErrorHandlingTest { /** Verifies remote closes trigger the release of all resources. */ @Test - public void testRemoteClose() throws Exception { + void testRemoteClose() throws Exception { final TestPooledBufferProvider outboundBuffers = new TestPooledBufferProvider(16); final CountDownLatch sync = new CountDownLatch(1); @@ -103,12 +103,12 @@ public class ServerTransportErrorHandlingTest { new ResultPartitionID(), 0, new InputChannelID(), Integer.MAX_VALUE)); // Wait for the notification - if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) { - fail( - "Timed out after waiting for " - + TestingUtils.TESTING_DURATION.toMillis() - + " ms to be notified about released partition."); - } + assertThat(sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) + .withFailMessage( + "Timed out after waiting for " + + TestingUtils.TESTING_DURATION.toMillis() + + " ms to be notified about released partition.") + .isTrue(); } finally { shutdown(serverAndClient); }