This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
commit e1c75638af65210c2f780996431dbc38f47517a0 Author: Weijie Guo <res...@163.com> AuthorDate: Wed Nov 16 21:05:24 2022 +0800 [hotfix] Migrate ClientTransportErrorHandlingTest & PartitionRequestClientFactoryTest & ResultPartitionDeploymentDescriptorTest to JUnit5 and AssertJ. (cherry picked from commit 97c9abf9791d8c08db27f0ef2a2a78488321b0a1) --- .../ResultPartitionDeploymentDescriptorTest.java | 39 ++++---- .../netty/ClientTransportErrorHandlingTest.java | 106 ++++++++++----------- .../netty/PartitionRequestClientFactoryTest.java | 99 ++++++++++--------- 3 files changed, 122 insertions(+), 122 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java index 33c79a1be4a..1fe6b9e329e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java @@ -31,20 +31,17 @@ import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionC import org.apache.flink.runtime.shuffle.PartitionDescriptor; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor; -import org.apache.flink.util.TestLogger; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.net.InetSocketAddress; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link ResultPartitionDeploymentDescriptor}. */ -public class ResultPartitionDeploymentDescriptorTest extends TestLogger { +class ResultPartitionDeploymentDescriptorTest { private static final IntermediateDataSetID resultId = new IntermediateDataSetID(); private static final int numberOfPartitions = 5; @@ -78,18 +75,18 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger { /** Tests simple de/serialization with {@link UnknownShuffleDescriptor}. */ @Test - public void testSerializationOfUnknownShuffleDescriptor() throws IOException { + void testSerializationOfUnknownShuffleDescriptor() throws IOException { ShuffleDescriptor shuffleDescriptor = new UnknownShuffleDescriptor(resultPartitionID); ShuffleDescriptor shuffleDescriptorCopy = CommonTestUtils.createCopySerializable(shuffleDescriptor); - assertThat(shuffleDescriptorCopy, instanceOf(UnknownShuffleDescriptor.class)); - assertThat(shuffleDescriptorCopy.getResultPartitionID(), is(resultPartitionID)); - assertThat(shuffleDescriptorCopy.isUnknown(), is(true)); + assertThat(shuffleDescriptorCopy).isInstanceOf(UnknownShuffleDescriptor.class); + assertThat(resultPartitionID).isEqualTo(shuffleDescriptorCopy.getResultPartitionID()); + assertThat(shuffleDescriptorCopy.isUnknown()).isTrue(); } /** Tests simple de/serialization with {@link NettyShuffleDescriptor}. */ @Test - public void testSerializationWithNettyShuffleDescriptor() throws IOException { + void testSerializationWithNettyShuffleDescriptor() throws IOException { ShuffleDescriptor shuffleDescriptor = new NettyShuffleDescriptor( producerLocation, @@ -99,13 +96,13 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger { ResultPartitionDeploymentDescriptor copy = createCopyAndVerifyResultPartitionDeploymentDescriptor(shuffleDescriptor); - assertThat(copy.getShuffleDescriptor(), instanceOf(NettyShuffleDescriptor.class)); + assertThat(copy.getShuffleDescriptor()).isInstanceOf(NettyShuffleDescriptor.class); NettyShuffleDescriptor shuffleDescriptorCopy = (NettyShuffleDescriptor) copy.getShuffleDescriptor(); - assertThat(shuffleDescriptorCopy.getResultPartitionID(), is(resultPartitionID)); - assertThat(shuffleDescriptorCopy.isUnknown(), is(false)); - assertThat(shuffleDescriptorCopy.isLocalTo(producerLocation), is(true)); - assertThat(shuffleDescriptorCopy.getConnectionId(), is(connectionID)); + assertThat(resultPartitionID).isEqualTo(shuffleDescriptorCopy.getResultPartitionID()); + assertThat(shuffleDescriptorCopy.isUnknown()).isFalse(); + assertThat(shuffleDescriptorCopy.isLocalTo(producerLocation)).isTrue(); + assertThat(connectionID).isEqualTo(shuffleDescriptorCopy.getConnectionId()); } private static ResultPartitionDeploymentDescriptor @@ -121,10 +118,10 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger { private static void verifyResultPartitionDeploymentDescriptorCopy( ResultPartitionDeploymentDescriptor copy) { - assertThat(copy.getResultId(), is(resultId)); - assertThat(copy.getTotalNumberOfPartitions(), is(numberOfPartitions)); - assertThat(copy.getPartitionId(), is(partitionId)); - assertThat(copy.getPartitionType(), is(partitionType)); - assertThat(copy.getNumberOfSubpartitions(), is(numberOfSubpartitions)); + assertThat(resultId).isEqualTo(copy.getResultId()); + assertThat(numberOfPartitions).isEqualTo(copy.getTotalNumberOfPartitions()); + assertThat(partitionId).isEqualTo(copy.getPartitionId()); + assertThat(partitionType).isEqualTo(copy.getPartitionType()); + assertThat(numberOfSubpartitions).isEqualTo(copy.getNumberOfSubpartitions()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java index f32e0a9b590..5830620d893 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java @@ -40,7 +40,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAda import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise; import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; -import org.junit.Test; +import org.junit.jupiter.api.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -52,11 +52,8 @@ import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.createConfig; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.mockito.Matchers.any; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doAnswer; @@ -67,14 +64,14 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class ClientTransportErrorHandlingTest { +class ClientTransportErrorHandlingTest { /** * Verifies that failed client requests via {@link PartitionRequestClient} are correctly * attributed to the respective {@link RemoteInputChannel}. */ @Test - public void testExceptionOnWrite() throws Exception { + void testExceptionOnWrite() throws Exception { NettyProtocol protocol = new NettyProtocol( @@ -146,14 +143,13 @@ public class ClientTransportErrorHandlingTest { // Second request is *not* successful requestClient.requestSubpartition(new ResultPartitionID(), 0, rich[1], 0); - // Wait for the notification and it could confirm all the request operations are done - if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) { - fail( - "Timed out after waiting for " - + TestingUtils.TESTING_DURATION.toMillis() - + " ms to be notified about the channel error."); - } + assertThat(sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) + .withFailMessage( + "Timed out after waiting for " + + TestingUtils.TESTING_DURATION.toMillis() + + " ms to be notified about the channel error.") + .isTrue(); // Only the second channel should be notified about the error verify(rich[0], times(0)).onError(any(LocalTransportException.class)); @@ -166,7 +162,7 @@ public class ClientTransportErrorHandlingTest { * RemoteTransportException} instances. */ @Test - public void testWrappingOfRemoteErrorMessage() throws Exception { + void testWrappingOfRemoteErrorMessage() throws Exception { EmbeddedChannel ch = createEmbeddedChannel(); NetworkClientHandler handler = getClientHandler(ch); @@ -187,14 +183,12 @@ public class ClientTransportErrorHandlingTest { new RuntimeException("Expected test exception"), rich[0].getInputChannelId())); - try { - // Exception should not reach end of pipeline... - ch.checkException(); - } catch (Exception e) { - fail( - "The exception reached the end of the pipeline and " - + "was not handled correctly by the last handler."); - } + // Exception should not reach end of pipeline... + assertThatNoException() + .describedAs( + "The exception reached the end of the pipeline and " + + "was not handled correctly by the last handler.") + .isThrownBy(ch::checkException); verify(rich[0], times(1)).onError(isA(RemoteTransportException.class)); verify(rich[1], never()).onError(any(Throwable.class)); @@ -205,14 +199,12 @@ public class ClientTransportErrorHandlingTest { new NettyMessage.ErrorResponse( new RuntimeException("Expected test exception"))); - try { - // Exception should not reach end of pipeline... - ch.checkException(); - } catch (Exception e) { - fail( - "The exception reached the end of the pipeline and " - + "was not handled correctly by the last handler."); - } + // Exception should not reach end of pipeline... + assertThatNoException() + .describedAs( + "The exception reached the end of the pipeline and " + + "was not handled correctly by the last handler.") + .isThrownBy(ch::checkException); verify(rich[0], times(2)).onError(isA(RemoteTransportException.class)); verify(rich[1], times(1)).onError(isA(RemoteTransportException.class)); @@ -223,7 +215,7 @@ public class ClientTransportErrorHandlingTest { * RemoteTransportException}. */ @Test - public void testExceptionOnRemoteClose() throws Exception { + void testExceptionOnRemoteClose() throws Exception { NettyProtocol protocol = new NettyProtocol( @@ -275,12 +267,12 @@ public class ClientTransportErrorHandlingTest { ch.writeAndFlush(Unpooled.buffer().writerIndex(16)); // Wait for the notification - if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) { - fail( - "Timed out after waiting for " - + TestingUtils.TESTING_DURATION.toMillis() - + " ms to be notified about remote connection close."); - } + assertThat(sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) + .withFailMessage( + "Timed out after waiting for " + + TestingUtils.TESTING_DURATION.toMillis() + + " ms to be notified about remote connection close.") + .isTrue(); // All the registered channels should be notified. for (RemoteInputChannel r : rich) { @@ -292,7 +284,7 @@ public class ClientTransportErrorHandlingTest { /** Verifies that fired Exceptions are handled correctly by the pipeline. */ @Test - public void testExceptionCaught() throws Exception { + void testExceptionCaught() throws Exception { EmbeddedChannel ch = createEmbeddedChannel(); NetworkClientHandler handler = getClientHandler(ch); @@ -308,14 +300,12 @@ public class ClientTransportErrorHandlingTest { ch.pipeline().fireExceptionCaught(new Exception()); - try { - // Exception should not reach end of pipeline... - ch.checkException(); - } catch (Exception e) { - fail( - "The exception reached the end of the pipeline and " - + "was not handled correctly by the last handler."); - } + // Exception should not reach end of pipeline... + assertThatNoException() + .describedAs( + "The exception reached the end of the pipeline and " + + "was not handled correctly by the last handler.") + .isThrownBy(ch::checkException); // ...but all the registered channels should be notified. for (RemoteInputChannel r : rich) { @@ -328,7 +318,7 @@ public class ClientTransportErrorHandlingTest { * instance of {@link RemoteTransportException}. */ @Test - public void testConnectionResetByPeer() throws Throwable { + void testConnectionResetByPeer() throws Throwable { EmbeddedChannel ch = createEmbeddedChannel(); NetworkClientHandler handler = getClientHandler(ch); @@ -345,13 +335,13 @@ public class ClientTransportErrorHandlingTest { Throwable cause = (Throwable) invocation.getArguments()[0]; try { - assertEquals(RemoteTransportException.class, cause.getClass()); - assertNotEquals("Connection reset by peer", cause.getMessage()); + assertThat(cause).isInstanceOf(RemoteTransportException.class); + assertThat(cause) + .hasMessageNotContaining("Connection reset by peer"); - assertEquals(IOException.class, cause.getCause().getClass()); - assertEquals( - "Connection reset by peer", - cause.getCause().getMessage()); + assertThat(cause.getCause()).isInstanceOf(IOException.class); + assertThat(cause.getCause()) + .hasMessage("Connection reset by peer"); } catch (Throwable t) { error[0] = t; } @@ -364,12 +354,12 @@ public class ClientTransportErrorHandlingTest { ch.pipeline().fireExceptionCaught(new IOException("Connection reset by peer")); - assertNull(error[0]); + assertThat(error[0]).isNull(); } /** Verifies that the channel is closed if there is an error *during* error notification. */ @Test - public void testChannelClosedOnExceptionDuringErrorNotification() throws Exception { + void testChannelClosedOnExceptionDuringErrorNotification() throws Exception { EmbeddedChannel ch = createEmbeddedChannel(); NetworkClientHandler handler = getClientHandler(ch); @@ -382,7 +372,7 @@ public class ClientTransportErrorHandlingTest { ch.pipeline().fireExceptionCaught(new Exception()); - assertFalse(ch.isActive()); + assertThat(ch.isActive()).isFalse(); } // --------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java index 968b824d541..3644b61e632 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java @@ -21,6 +21,9 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.NetworkClientHandler; import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.netty4.io.netty.channel.Channel; @@ -31,13 +34,15 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -49,23 +54,22 @@ import java.util.concurrent.Future; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; import static org.mockito.Mockito.mock; /** {@link PartitionRequestClientFactory} test. */ -@RunWith(Parameterized.class) +@ExtendWith(ParameterizedTestExtension.class) public class PartitionRequestClientFactoryTest extends TestLogger { - @Parameterized.Parameter public boolean connectionReuseEnabled; + @Parameter public boolean connectionReuseEnabled; - @Parameterized.Parameters(name = "connection reuse enabled = {0}") - public static Object[] parameters() { - return new Object[][] {new Object[] {true}, new Object[] {false}}; + @Parameters(name = "connectionReuseEnabled={0}") + public static Collection<Boolean> parameters() { + return Arrays.asList(false, true); } - @Test - public void testInterruptsNotCached() throws Exception { + @TestTemplate + void testInterruptsNotCached() throws Exception { NettyTestUtil.NettyServerAndClient nettyServerAndClient = createNettyServerAndClient(); try { AwaitingNettyClient nettyClient = @@ -106,8 +110,8 @@ public class PartitionRequestClientFactoryTest extends TestLogger { interrupted.get(); } - @Test - public void testExceptionsAreNotCached() throws Exception { + @TestTemplate + void testExceptionsAreNotCached() throws Exception { NettyTestUtil.NettyServerAndClient nettyServerAndClient = createNettyServerAndClient(); try { @@ -117,12 +121,9 @@ public class PartitionRequestClientFactoryTest extends TestLogger { connectionReuseEnabled); final ConnectionID connectionID = nettyServerAndClient.getConnectionID(0); - try { - factory.createPartitionRequestClient(connectionID); - fail("Expected the first request to fail."); - } catch (RemoteTransportException expected) { - // expected - } + assertThatThrownBy(() -> factory.createPartitionRequestClient(connectionID)) + .withFailMessage("Expected the first request to fail.") + .isInstanceOf(RemoteTransportException.class); factory.createPartitionRequestClient(connectionID); } finally { @@ -131,8 +132,8 @@ public class PartitionRequestClientFactoryTest extends TestLogger { } } - @Test - public void testReuseNettyPartitionRequestClient() throws Exception { + @TestTemplate + void testReuseNettyPartitionRequestClient() throws Exception { NettyTestUtil.NettyServerAndClient nettyServerAndClient = createNettyServerAndClient(); try { checkReuseNettyPartitionRequestClient(nettyServerAndClient, 1); @@ -161,7 +162,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger { nettyServerAndClient.getConnectionID((int) (Math.random() * Integer.MAX_VALUE)); set.add(factory.createPartitionRequestClient(connectionID)); } - assertTrue(set.size() <= maxNumberOfConnections); + assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections); } /** @@ -214,8 +215,8 @@ public class PartitionRequestClientFactoryTest extends TestLogger { shutdown(serverAndClient); } - @Test - public void testNettyClientConnectRetry() throws Exception { + @TestTemplate + void testNettyClientConnectRetry() throws Exception { NettyTestUtil.NettyServerAndClient serverAndClient = createNettyServerAndClient(); UnstableNettyClient unstableNettyClient = new UnstableNettyClient(serverAndClient.client(), 2); @@ -231,23 +232,31 @@ public class PartitionRequestClientFactoryTest extends TestLogger { } // see https://issues.apache.org/jira/browse/FLINK-18821 - @Test(expected = IOException.class) - public void testFailureReportedToSubsequentRequests() throws Exception { + @TestTemplate + void testFailureReportedToSubsequentRequests() throws Exception { PartitionRequestClientFactory factory = new PartitionRequestClientFactory( new FailingNettyClient(), 2, 1, connectionReuseEnabled); - try { - factory.createPartitionRequestClient( - new ConnectionID(new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0)); - } catch (Exception e) { - // expected - } - factory.createPartitionRequestClient( - new ConnectionID(new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0)); + + assertThatThrownBy( + () -> + factory.createPartitionRequestClient( + new ConnectionID( + new InetSocketAddress(InetAddress.getLocalHost(), 8080), + 0))); + + assertThatThrownBy( + () -> + factory.createPartitionRequestClient( + new ConnectionID( + new InetSocketAddress( + InetAddress.getLocalHost(), 8080), + 0))) + .isInstanceOf(IOException.class); } - @Test(expected = IOException.class) - public void testNettyClientConnectRetryFailure() throws Exception { + @TestTemplate + void testNettyClientConnectRetryFailure() throws Exception { NettyTestUtil.NettyServerAndClient serverAndClient = createNettyServerAndClient(); UnstableNettyClient unstableNettyClient = new UnstableNettyClient(serverAndClient.client(), 3); @@ -257,16 +266,20 @@ public class PartitionRequestClientFactoryTest extends TestLogger { new PartitionRequestClientFactory( unstableNettyClient, 2, 1, connectionReuseEnabled); - factory.createPartitionRequestClient(serverAndClient.getConnectionID(0)); - + assertThatThrownBy( + () -> { + factory.createPartitionRequestClient( + serverAndClient.getConnectionID(0)); + }) + .isInstanceOf(IOException.class); } finally { serverAndClient.client().shutdown(); serverAndClient.server().shutdown(); } } - @Test - public void testNettyClientConnectRetryMultipleThread() throws Exception { + @TestTemplate + void testNettyClientConnectRetryMultipleThread() throws Exception { NettyTestUtil.NettyServerAndClient serverAndClient = createNettyServerAndClient(); UnstableNettyClient unstableNettyClient = new UnstableNettyClient(serverAndClient.client(), 2); @@ -301,10 +314,10 @@ public class PartitionRequestClientFactoryTest extends TestLogger { NettyPartitionRequestClient client; try { client = runnableFuture.get(); - assertNotNull(client); + assertThat(client).isNotNull(); } catch (Exception e) { System.out.println(e.getMessage()); - fail(); + fail(e.getMessage()); } });