This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9119ed0  HDDS-1384. TestBlockOutputStreamWithFailures is failing
9119ed0 is described below

commit 9119ed07ff32143b548316bf69c49695196f8422
Author: Márton Elek <e...@apache.org>
AuthorDate: Thu Jul 11 12:46:39 2019 +0200

    HDDS-1384. TestBlockOutputStreamWithFailures is failing
    
    Closes #1029
---
 .../common/transport/server/XceiverServerGrpc.java | 37 +++++++-------
 .../transport/server/ratis/XceiverServerRatis.java | 38 ++++++++-------
 .../apache/hadoop/ozone/TestMiniOzoneCluster.java  | 56 ++++++++++++++++------
 3 files changed, 83 insertions(+), 48 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index 3f262a1..78c941e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -21,6 +21,7 @@ package 
org.apache.hadoop.ozone.container.common.transport.server;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
@@ -51,9 +52,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
@@ -71,6 +69,8 @@ public final class XceiverServerGrpc extends XceiverServer {
   private Server server;
   private final ContainerDispatcher storageContainer;
   private boolean isStarted;
+  private DatanodeDetails datanodeDetails;
+
 
   /**
    * Constructs a Grpc server class.
@@ -84,25 +84,15 @@ public final class XceiverServerGrpc extends XceiverServer {
     Preconditions.checkNotNull(conf);
 
     this.id = datanodeDetails.getUuid();
+    this.datanodeDetails = datanodeDetails;
     this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
         OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
-    // Get an available port on current node and
-    // use that as the container port
+
     if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
         OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) {
-      try (ServerSocket socket = new ServerSocket()) {
-        socket.setReuseAddress(true);
-        SocketAddress address = new InetSocketAddress(0);
-        socket.bind(address);
-        this.port = socket.getLocalPort();
-        LOG.info("Found a free port for the server : {}", this.port);
-      } catch (IOException e) {
-        LOG.error("Unable find a random free port for the server, "
-            + "fallback to use default port {}", this.port, e);
-      }
+      this.port = 0;
     }
-    datanodeDetails.setPort(
-        DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, port));
+
     NettyServerBuilder nettyServerBuilder =
         ((NettyServerBuilder) ServerBuilder.forPort(port))
             .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
@@ -165,6 +155,19 @@ public final class XceiverServerGrpc extends XceiverServer 
{
   public void start() throws IOException {
     if (!isStarted) {
       server.start();
+      int realPort = server.getPort();
+
+      if (port == 0) {
+        LOG.info("{} {} is started using port {}", getClass().getSimpleName(),
+            this.id, realPort);
+        port = realPort;
+      }
+
+      //register the real port to the datanode details.
+      datanodeDetails.setPort(DatanodeDetails
+          .newPort(Name.STANDALONE,
+              realPort));
+
       isStarted = true;
     }
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index f6ecb54..72f6ab4 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -66,6 +66,7 @@ import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
@@ -73,9 +74,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -101,7 +99,7 @@ public final class XceiverServerRatis extends XceiverServer {
     return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
   }
 
-  private final int port;
+  private int port;
   private final RaftServer server;
   private ThreadPoolExecutor chunkExecutor;
   private final List<ExecutorService> executors;
@@ -112,6 +110,7 @@ public final class XceiverServerRatis extends XceiverServer 
{
   private long nodeFailureTimeoutMs;
   private final long cacheEntryExpiryInteval;
   private boolean isStarted = false;
+  private DatanodeDetails datanodeDetails;
 
   private XceiverServerRatis(DatanodeDetails dd, int port,
       ContainerDispatcher dispatcher, Configuration conf, StateContext
@@ -119,6 +118,7 @@ public final class XceiverServerRatis extends XceiverServer 
{
       throws IOException {
     super(conf, caClient);
     Objects.requireNonNull(dd, "id == null");
+    datanodeDetails = dd;
     this.port = port;
     RaftProperties serverProperties = newRaftProperties(conf);
     final int numWriteChunkThreads = conf.getInt(
@@ -403,21 +403,11 @@ public final class XceiverServerRatis extends 
XceiverServer {
     if (ozoneConf.getBoolean(OzoneConfigKeys
             .DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT)) {
-      try (ServerSocket socket = new ServerSocket()) {
-        socket.setReuseAddress(true);
-        SocketAddress address = new InetSocketAddress(0);
-        socket.bind(address);
-        localPort = socket.getLocalPort();
-        LOG.info("Found a free port for the server : {}", localPort);
-      } catch (IOException e) {
-        LOG.error("Unable find a random free port for the server, "
-            + "fallback to use default port {}", localPort, e);
-      }
+      localPort = 0;
     }
     GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfig(
           new SecurityConfig(ozoneConf));
-    datanodeDetails.setPort(
-        DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort));
+
     return new XceiverServerRatis(datanodeDetails, localPort,
         dispatcher, ozoneConf, context, tlsConfig, caClient);
   }
@@ -429,6 +419,22 @@ public final class XceiverServerRatis extends 
XceiverServer {
           server.getId(), getIPCPort());
       chunkExecutor.prestartAllCoreThreads();
       server.start();
+
+      int realPort =
+          ((RaftServerProxy) server).getServerRpc().getInetSocketAddress()
+              .getPort();
+
+      if (port == 0) {
+        LOG.info("{} {} is started using port {}", getClass().getSimpleName(),
+            server.getId(), realPort);
+        port = realPort;
+      }
+
+      //register the real port to the datanode details.
+      datanodeDetails.setPort(DatanodeDetails
+          .newPort(DatanodeDetails.Port.Name.RATIS,
+              realPort));
+
       isStarted = true;
     }
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index f3a5d2c..570fc00 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -23,6 +23,7 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -194,25 +195,50 @@ public class TestMiniOzoneCluster {
     ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, true);
     ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
         true);
-    try (
-        DatanodeStateMachine sm1 = new DatanodeStateMachine(
-            TestUtils.randomDatanodeDetails(), ozoneConf,  null, null);
-        DatanodeStateMachine sm2 = new DatanodeStateMachine(
-            TestUtils.randomDatanodeDetails(), ozoneConf,  null, null);
-        DatanodeStateMachine sm3 = new DatanodeStateMachine(
-            TestUtils.randomDatanodeDetails(), ozoneConf,  null, null)
-    ) {
+    List<DatanodeStateMachine> stateMachines = new ArrayList<>();
+    try {
+
+      for (int i = 0; i < 3; i++) {
+        stateMachines.add(new DatanodeStateMachine(
+            TestUtils.randomDatanodeDetails(), ozoneConf, null, null));
+      }
+
+      //we need to start all the servers to get the fix ports
+      for (DatanodeStateMachine dsm : stateMachines) {
+        dsm.getContainer().getReadChannel().start();
+        dsm.getContainer().getWriteChannel().start();
+
+      }
+
+      for (DatanodeStateMachine dsm : stateMachines) {
+        dsm.getContainer().getWriteChannel().stop();
+        dsm.getContainer().getReadChannel().stop();
+
+      }
+
+      //after the start the real port numbers should be available AND unique
       HashSet<Integer> ports = new HashSet<Integer>();
-      assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));
-      assertTrue(ports.add(sm2.getContainer().getReadChannel().getIPCPort()));
-      assertTrue(ports.add(sm3.getContainer().getReadChannel().getIPCPort()));
+      for (DatanodeStateMachine dsm : stateMachines) {
+        int readPort = dsm.getContainer().getReadChannel().getIPCPort();
+
+        assertNotEquals("Port number of the service is not updated", 0,
+            readPort);
 
-      // Assert that ratis is also on a different port.
-      assertTrue(ports.add(sm1.getContainer().getWriteChannel().getIPCPort()));
-      assertTrue(ports.add(sm2.getContainer().getWriteChannel().getIPCPort()));
-      assertTrue(ports.add(sm3.getContainer().getWriteChannel().getIPCPort()));
+        assertTrue("Port of datanode service is conflicted with other server.",
+            ports.add(readPort));
 
+        int writePort = dsm.getContainer().getWriteChannel().getIPCPort();
 
+        assertNotEquals("Port number of the service is not updated", 0,
+            writePort);
+        assertTrue("Port of datanode service is conflicted with other server.",
+            ports.add(writePort));
+      }
+
+    } finally {
+      for (DatanodeStateMachine dsm : stateMachines) {
+        dsm.close();
+      }
     }
 
     // Turn off the random port flag and test again


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to