This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new e705090c89e [FLINK-30678][tests] Use random port e705090c89e is described below commit e705090c89ef9411304d54c57eac2f0080c522a7 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Thu Jan 19 10:48:23 2023 +0100 [FLINK-30678][tests] Use random port --- .../apache/flink/client/program/ClientTest.java | 13 ------ .../flink/queryablestate/network/ClientTest.java | 8 +--- .../network/netty/NettyConnectionManagerTest.java | 11 ++---- .../netty/NettyPartitionRequestClientTest.java | 25 +++++------- .../runtime/taskexecutor/TaskExecutorTest.java | 46 ++++++++++------------ 5 files changed, 36 insertions(+), 67 deletions(-) diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index 4d8f4403e81..6efc8fb0bd1 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -54,9 +54,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.testutils.InternalMiniClusterExtension; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.util.FlinkRuntimeException; -import org.apache.flink.util.NetUtils; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -88,8 +86,6 @@ class ClientTest { private Plan plan; - private NetUtils.Port port; - private Configuration config; private static final String TEST_EXECUTOR_NAME = "test_executor"; @@ -108,20 +104,11 @@ class ClientTest { config = new Configuration(); config.setString(JobManagerOptions.ADDRESS, "localhost"); - port = NetUtils.getAvailablePort(); - config.setInteger(JobManagerOptions.PORT, port.getPort()); config.set( AkkaOptions.ASK_TIMEOUT_DURATION, AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue()); } - @AfterEach - void tearDown() throws Exception { - if (port != null) { - port.close(); - } - } - private Configuration fromPackagedProgram( final PackagedProgram program, final int parallelism, final boolean detached) { final Configuration configuration = new Configuration(); diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java index 383fe41ee18..193508ce16f 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java @@ -45,7 +45,6 @@ import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.NetUtils; import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; @@ -255,13 +254,10 @@ public class ClientTest extends TestLogger { Client<KvStateInternalRequest, KvStateResponse> client = null; - try (NetUtils.Port port = NetUtils.getAvailablePort()) { + try { client = new Client<>("Test Client", 1, serializer, stats); - int availablePort = port.getPort(); - - InetSocketAddress serverAddress = - new InetSocketAddress(InetAddress.getLocalHost(), availablePort); + InetSocketAddress serverAddress = new InetSocketAddress(InetAddress.getLocalHost(), 0); KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java index d313c594d13..f93381db59d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java @@ -22,7 +22,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; -import org.apache.flink.util.NetUtils; import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; @@ -48,11 +47,11 @@ public class NettyConnectionManagerTest { // Expected number of arenas and threads int numberOfSlots = 2; NettyConnectionManager connectionManager; - try (NetUtils.Port port = NetUtils.getAvailablePort()) { + { NettyConfig config = new NettyConfig( InetAddress.getLocalHost(), - port.getPort(), + 0, 1024, numberOfSlots, new Configuration()); @@ -117,11 +116,9 @@ public class NettyConnectionManagerTest { flinkConfig.setInteger(NettyShuffleEnvironmentOptions.NUM_THREADS_SERVER, 4); NettyConnectionManager connectionManager; - try (NetUtils.Port port = NetUtils.getAvailablePort()) { - + { NettyConfig config = - new NettyConfig( - InetAddress.getLocalHost(), port.getPort(), 1024, 1337, flinkConfig); + new NettyConfig(InetAddress.getLocalHost(), 0, 1024, 1337, flinkConfig); connectionManager = createNettyConnectionManager(config); connectionManager.start(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java index 6ede9dbdc90..ad34bdf363e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java @@ -30,7 +30,6 @@ import org.apache.flink.runtime.io.network.netty.NettyMessage.ResumeConsumption; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.util.NetUtils; import org.apache.flink.shaded.netty4.io.netty.channel.Channel; import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; @@ -285,20 +284,16 @@ public class NettyPartitionRequestClientTest { private NettyPartitionRequestClient createPartitionRequestClient( Channel tcpChannel, NetworkClientHandler clientHandler, boolean connectionReuseEnabled) throws Exception { - try (NetUtils.Port availablePort = NetUtils.getAvailablePort()) { - int port = availablePort.getPort(); - ConnectionID connectionID = - new ConnectionID( - ResourceID.generate(), new InetSocketAddress("localhost", port), 0); - NettyConfig config = - new NettyConfig(InetAddress.getLocalHost(), port, 1024, 1, new Configuration()); - NettyClient nettyClient = new NettyClient(config); - PartitionRequestClientFactory partitionRequestClientFactory = - new PartitionRequestClientFactory(nettyClient, connectionReuseEnabled); - - return new NettyPartitionRequestClient( - tcpChannel, clientHandler, connectionID, partitionRequestClientFactory); - } + ConnectionID connectionID = + new ConnectionID(ResourceID.generate(), new InetSocketAddress("localhost", 0), 0); + NettyConfig config = + new NettyConfig(InetAddress.getLocalHost(), 0, 1024, 1, new Configuration()); + NettyClient nettyClient = new NettyClient(config); + PartitionRequestClientFactory partitionRequestClientFactory = + new PartitionRequestClientFactory(nettyClient, connectionReuseEnabled); + + return new NettyPartitionRequestClient( + tcpChannel, clientHandler, connectionID, partitionRequestClientFactory); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index cc857ba0e1f..6d3a885ca43 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -109,7 +109,6 @@ import org.apache.flink.testutils.executor.TestExecutorResource; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.NetUtils; import org.apache.flink.util.Reference; import org.apache.flink.util.TestLogger; import org.apache.flink.util.TimeUtils; @@ -2244,31 +2243,26 @@ public class TaskExecutorTest extends TestLogger { @Test(timeout = 10000L) public void testLogNotFoundHandling() throws Throwable { - try (NetUtils.Port port = NetUtils.getAvailablePort()) { - int dataPort = port.getPort(); - - configuration.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, dataPort); - configuration.setInteger( - NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); - configuration.setInteger( - NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); - configuration.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist"); - - try (TaskSubmissionTestEnvironment env = - new Builder(jobId) - .setConfiguration(configuration) - .setLocalCommunication(false) - .build(EXECUTOR_RESOURCE.getExecutor())) { - TaskExecutorGateway tmGateway = env.getTaskExecutorGateway(); - try { - CompletableFuture<TransientBlobKey> logFuture = - tmGateway.requestFileUploadByType(FileType.LOG, timeout); - logFuture.get(); - } catch (Exception e) { - assertThat( - e.getMessage(), - containsString("The file LOG does not exist on the TaskExecutor.")); - } + configuration.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, 0); + configuration.setInteger( + NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); + configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); + configuration.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist"); + + try (TaskSubmissionTestEnvironment env = + new Builder(jobId) + .setConfiguration(configuration) + .setLocalCommunication(false) + .build(EXECUTOR_RESOURCE.getExecutor())) { + TaskExecutorGateway tmGateway = env.getTaskExecutorGateway(); + try { + CompletableFuture<TransientBlobKey> logFuture = + tmGateway.requestFileUploadByType(FileType.LOG, timeout); + logFuture.get(); + } catch (Exception e) { + assertThat( + e.getMessage(), + containsString("The file LOG does not exist on the TaskExecutor.")); } } }