This is an automated email from the ASF dual-hosted git repository. elek pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 9119ed0 HDDS-1384. TestBlockOutputStreamWithFailures is failing 9119ed0 is described below commit 9119ed07ff32143b548316bf69c49695196f8422 Author: Márton Elek <e...@apache.org> AuthorDate: Thu Jul 11 12:46:39 2019 +0200 HDDS-1384. TestBlockOutputStreamWithFailures is failing Closes #1029 --- .../common/transport/server/XceiverServerGrpc.java | 37 +++++++------- .../transport/server/ratis/XceiverServerRatis.java | 38 ++++++++------- .../apache/hadoop/ozone/TestMiniOzoneCluster.java | 56 ++++++++++++++++------ 3 files changed, 83 insertions(+), 48 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index 3f262a1..78c941e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.common.transport.server; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; @@ -51,9 +52,6 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.SocketAddress; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -71,6 +69,8 @@ public final class XceiverServerGrpc extends XceiverServer { private Server server; private final ContainerDispatcher storageContainer; private boolean isStarted; + private DatanodeDetails datanodeDetails; + /** * Constructs a Grpc server class. @@ -84,25 +84,15 @@ public final class XceiverServerGrpc extends XceiverServer { Preconditions.checkNotNull(conf); this.id = datanodeDetails.getUuid(); + this.datanodeDetails = datanodeDetails; this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); - // Get an available port on current node and - // use that as the container port + if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) { - try (ServerSocket socket = new ServerSocket()) { - socket.setReuseAddress(true); - SocketAddress address = new InetSocketAddress(0); - socket.bind(address); - this.port = socket.getLocalPort(); - LOG.info("Found a free port for the server : {}", this.port); - } catch (IOException e) { - LOG.error("Unable find a random free port for the server, " - + "fallback to use default port {}", this.port, e); - } + this.port = 0; } - datanodeDetails.setPort( - DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, port)); + NettyServerBuilder nettyServerBuilder = ((NettyServerBuilder) ServerBuilder.forPort(port)) .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE); @@ -165,6 +155,19 @@ public final class XceiverServerGrpc extends XceiverServer { public void start() throws IOException { if (!isStarted) { server.start(); + int realPort = server.getPort(); + + if (port == 0) { + LOG.info("{} {} is started using port {}", getClass().getSimpleName(), + this.id, realPort); + port = realPort; + } + + //register the real port to the datanode details. + datanodeDetails.setPort(DatanodeDetails + .newPort(Name.STANDALONE, + realPort)); + isStarted = true; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index f6ecb54..72f6ab4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -66,6 +66,7 @@ import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.proto.RaftProtos.RoleInfoProto; import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; @@ -73,9 +74,6 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -101,7 +99,7 @@ public final class XceiverServerRatis extends XceiverServer { return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE; } - private final int port; + private int port; private final RaftServer server; private ThreadPoolExecutor chunkExecutor; private final List<ExecutorService> executors; @@ -112,6 +110,7 @@ public final class XceiverServerRatis extends XceiverServer { private long nodeFailureTimeoutMs; private final long cacheEntryExpiryInteval; private boolean isStarted = false; + private DatanodeDetails datanodeDetails; private XceiverServerRatis(DatanodeDetails dd, int port, ContainerDispatcher dispatcher, Configuration conf, StateContext @@ -119,6 +118,7 @@ public final class XceiverServerRatis extends XceiverServer { throws IOException { super(conf, caClient); Objects.requireNonNull(dd, "id == null"); + datanodeDetails = dd; this.port = port; RaftProperties serverProperties = newRaftProperties(conf); final int numWriteChunkThreads = conf.getInt( @@ -403,21 +403,11 @@ public final class XceiverServerRatis extends XceiverServer { if (ozoneConf.getBoolean(OzoneConfigKeys .DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT)) { - try (ServerSocket socket = new ServerSocket()) { - socket.setReuseAddress(true); - SocketAddress address = new InetSocketAddress(0); - socket.bind(address); - localPort = socket.getLocalPort(); - LOG.info("Found a free port for the server : {}", localPort); - } catch (IOException e) { - LOG.error("Unable find a random free port for the server, " - + "fallback to use default port {}", localPort, e); - } + localPort = 0; } GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfig( new SecurityConfig(ozoneConf)); - datanodeDetails.setPort( - DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort)); + return new XceiverServerRatis(datanodeDetails, localPort, dispatcher, ozoneConf, context, tlsConfig, caClient); } @@ -429,6 +419,22 @@ public final class XceiverServerRatis extends XceiverServer { server.getId(), getIPCPort()); chunkExecutor.prestartAllCoreThreads(); server.start(); + + int realPort = + ((RaftServerProxy) server).getServerRpc().getInetSocketAddress() + .getPort(); + + if (port == 0) { + LOG.info("{} {} is started using port {}", getClass().getSimpleName(), + server.getId(), realPort); + port = realPort; + } + + //register the real port to the datanode details. + datanodeDetails.setPort(DatanodeDetails + .newPort(DatanodeDetails.Port.Name.RATIS, + realPort)); + isStarted = true; } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java index f3a5d2c..570fc00 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -194,25 +195,50 @@ public class TestMiniOzoneCluster { ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, true); ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, true); - try ( - DatanodeStateMachine sm1 = new DatanodeStateMachine( - TestUtils.randomDatanodeDetails(), ozoneConf, null, null); - DatanodeStateMachine sm2 = new DatanodeStateMachine( - TestUtils.randomDatanodeDetails(), ozoneConf, null, null); - DatanodeStateMachine sm3 = new DatanodeStateMachine( - TestUtils.randomDatanodeDetails(), ozoneConf, null, null) - ) { + List<DatanodeStateMachine> stateMachines = new ArrayList<>(); + try { + + for (int i = 0; i < 3; i++) { + stateMachines.add(new DatanodeStateMachine( + TestUtils.randomDatanodeDetails(), ozoneConf, null, null)); + } + + //we need to start all the servers to get the fix ports + for (DatanodeStateMachine dsm : stateMachines) { + dsm.getContainer().getReadChannel().start(); + dsm.getContainer().getWriteChannel().start(); + + } + + for (DatanodeStateMachine dsm : stateMachines) { + dsm.getContainer().getWriteChannel().stop(); + dsm.getContainer().getReadChannel().stop(); + + } + + //after the start the real port numbers should be available AND unique HashSet<Integer> ports = new HashSet<Integer>(); - assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort())); - assertTrue(ports.add(sm2.getContainer().getReadChannel().getIPCPort())); - assertTrue(ports.add(sm3.getContainer().getReadChannel().getIPCPort())); + for (DatanodeStateMachine dsm : stateMachines) { + int readPort = dsm.getContainer().getReadChannel().getIPCPort(); + + assertNotEquals("Port number of the service is not updated", 0, + readPort); - // Assert that ratis is also on a different port. - assertTrue(ports.add(sm1.getContainer().getWriteChannel().getIPCPort())); - assertTrue(ports.add(sm2.getContainer().getWriteChannel().getIPCPort())); - assertTrue(ports.add(sm3.getContainer().getWriteChannel().getIPCPort())); + assertTrue("Port of datanode service is conflicted with other server.", + ports.add(readPort)); + int writePort = dsm.getContainer().getWriteChannel().getIPCPort(); + assertNotEquals("Port number of the service is not updated", 0, + writePort); + assertTrue("Port of datanode service is conflicted with other server.", + ports.add(writePort)); + } + + } finally { + for (DatanodeStateMachine dsm : stateMachines) { + dsm.close(); + } } // Turn off the random port flag and test again --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org