This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 023bf5f7ba HDDS-9392. Streaming write pipeline should pick the nearest 
datanode as primary node (#5414)
023bf5f7ba is described below

commit 023bf5f7ba6a02d9ba955f89bb67f88ee27c5cbd
Author: Ivan Andika <[email protected]>
AuthorDate: Sun Dec 10 09:39:48 2023 +0800

    HDDS-9392. Streaming write pipeline should pick the nearest datanode as 
primary node (#5414)
---
 .../hadoop/hdds/scm/XceiverClientManager.java      | 27 +++++++++++++++-------
 .../apache/hadoop/hdds/scm/XceiverClientRatis.java |  3 +--
 .../hdds/scm/storage/BlockDataStreamOutput.java    |  9 +++++---
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  | 20 ++++++++++++----
 ...lockLocationProtocolClientSideTranslatorPB.java |  2 ++
 .../client/io/BlockDataStreamOutputEntryPool.java  |  3 ++-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  | 10 +++++---
 7 files changed, 52 insertions(+), 22 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index c0a8b7421c..6b3e6cdbf4 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -274,16 +274,27 @@ public class XceiverClientManager implements Closeable, 
XceiverClientFactory {
   private String getPipelineCacheKey(Pipeline pipeline,
                                      boolean topologyAware) {
     String key = pipeline.getId().getId().toString() + pipeline.getType();
-    boolean isEC = pipeline.getReplicationConfig()
-        .getReplicationType() == HddsProtos.ReplicationType.EC;
+    boolean isEC = pipeline.getType() == HddsProtos.ReplicationType.EC;
     if (topologyAware || isEC) {
       try {
-        key += pipeline.getClosestNode().getHostName();
-        if (isEC) {
-          // Currently EC uses standalone client.
-          key += pipeline.getClosestNode()
-              .getPort(DatanodeDetails.Port.Name.STANDALONE);
-        }
+        DatanodeDetails closestNode = pipeline.getClosestNode();
+        // Pipeline cache key uses host:port suffix to handle
+        // both EC, Ratis, and Standalone client.
+        //
+        // For EC client, the host:port cache key is needed
+        // so that there is a different cache key for each node in
+        // a block group.
+        //
+        // For Ratis and Standalone client, the host:port cache key is needed
+        // to handle the possibility of two datanodes sharing the same machine.
+        // While normally one machine only hosts one datanode service,
+        // this situation might arise in tests.
+        //
+        // Standalone port is chosen since all datanodes should have a
+        // standalone port regardless of version and this port should not
+        // have any collisions.
+        key += closestNode.getHostName() + closestNode.getPort(
+            DatanodeDetails.Port.Name.STANDALONE);
       } catch (IOException e) {
         LOG.error("Failed to get closest node to create pipeline cache key:" +
             e.getMessage());
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 7be6214b32..72ed30b37f 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -182,8 +182,7 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
       LOG.debug("Connecting to pipeline:{} leaderDatanode:{}, " +
           "primaryDatanode:{}", getPipeline().getId(),
           RatisHelper.toRaftPeerId(pipeline.getLeaderNode()),
-          // TODO (HDDS-9392): Update this to getClosestNode
-          RatisHelper.toRaftPeerId(pipeline.getFirstNode()));
+          RatisHelper.toRaftPeerId(pipeline.getClosestNode()));
     }
 
     if (!client.compareAndSet(null,
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 95f37eecb2..58bea6a99b 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -166,7 +166,7 @@ public class BlockDataStreamOutput implements 
ByteBufferStreamOutput {
         BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
             .addMetadata(keyValue);
     this.xceiverClient =
-        (XceiverClientRatis)xceiverClientManager.acquireClient(pipeline);
+        (XceiverClientRatis)xceiverClientManager.acquireClient(pipeline, true);
     this.token = token;
     // Alternatively, stream setup can be delayed till the first chunk write.
     this.out = setupStream(pipeline);
@@ -198,7 +198,9 @@ public class BlockDataStreamOutput implements 
ByteBufferStreamOutput {
         ContainerProtos.WriteChunkRequestProto.newBuilder()
             .setBlockID(blockID.get().getDatanodeBlockIDProtobuf());
 
-    String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
+    // TODO: The datanode UUID is not used meaningfully, consider deprecating
+    //  it or remove it completely if possible
+    String id = pipeline.getFirstNode().getUuidString();
     ContainerProtos.ContainerCommandRequestProto.Builder builder =
         ContainerProtos.ContainerCommandRequestProto.newBuilder()
             .setCmdType(ContainerProtos.Type.StreamInit)
@@ -607,7 +609,8 @@ public class BlockDataStreamOutput implements 
ByteBufferStreamOutput {
 
   public void cleanup(boolean invalidateClient) {
     if (xceiverClientFactory != null) {
-      xceiverClientFactory.releaseClient(xceiverClient, invalidateClient);
+      xceiverClientFactory.releaseClient(xceiverClient, invalidateClient,
+          true);
     }
     xceiverClientFactory = null;
     xceiverClient = null;
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
index 3f92ec634a..e543c39c95 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
@@ -203,13 +203,23 @@ public final class RatisHelper {
         toRaftPeers(pipeline));
   }
 
+  /**
+   * Create a Raft client used primarily for Ozone client communications with
+   * the Ratis pipeline.
+   * @param rpcType rpc type
+   * @param pipeline pipeline
+   * @param retryPolicy retry policy
+   * @param tlsConfig tls config
+   * @param ozoneConfiguration configuration
+   * @return Raft client
+   * @throws IOException IOException
+   */
   public static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline,
       RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
       ConfigurationSource ozoneConfiguration) throws IOException {
     return newRaftClient(rpcType,
         toRaftPeerId(pipeline.getLeaderNode()),
-        // TODO (HDDS-9392): Update this to getClosestNode
-        toRaftPeer(pipeline.getFirstNode()),
+        toRaftPeer(pipeline.getClosestNode()),
         newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()),
             pipeline.getNodes()), retryPolicy, tlsConfig, ozoneConfiguration);
   }
@@ -448,14 +458,14 @@ public final class RatisHelper {
     RaftPeerId primaryId = null;
     List<RaftPeerId> raftPeers = new ArrayList<>();
 
-    for (DatanodeDetails dn : pipeline.getNodes()) {
+    for (DatanodeDetails dn : pipeline.getNodesInOrder()) {
       final RaftPeerId raftPeerId = RaftPeerId.valueOf(dn.getUuidString());
       try {
-        if (dn == pipeline.getFirstNode()) {
+        if (dn == pipeline.getClosestNode()) {
           primaryId = raftPeerId;
         }
       } catch (IOException e) {
-        LOG.error("Can not get FirstNode from the pipeline: {} with " +
+        LOG.error("Can not get primary node from the pipeline: {} with " +
             "exception: {}", pipeline, e.getLocalizedMessage());
         return null;
       }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
index dd59972ca2..b072c3690b 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -142,6 +142,8 @@ public final class 
ScmBlockLocationProtocolClientSideTranslatorPB
    * @param num               - number of blocks.
    * @param replicationConfig - replication configuration of the blocks.
    * @param excludeList       - exclude list while allocating blocks.
+   * @param clientMachine     - client address, depends, can be hostname or
+   *                            ipaddress.
    * @return allocated block accessing info (key, pipeline).
    * @throws IOException
    */
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
index 8b8e8c474e..8e80b38104 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
@@ -82,7 +82,8 @@ public class BlockDataStreamOutputEntryPool implements 
KeyMetadataAware {
         .setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
         
.setReplicationConfig(replicationConfig).setDataSize(info.getDataSize())
         .setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID)
-        .setMultipartUploadPartNumber(partNumber).build();
+        .setMultipartUploadPartNumber(partNumber)
+        .setSortDatanodesInPipeline(true).build();
     this.requestID = requestId;
     this.openID = openID;
     this.excludeList = createExcludeList();
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index c85e2ed389..6aed19fb9d 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -1401,6 +1401,7 @@ public class RpcClient implements ClientProtocol {
         .setDataSize(size)
         .setReplicationConfig(replicationConfig)
         .addAllMetadataGdpr(metadata)
+        .setSortDatanodesInPipeline(true)
         .setAcls(getAclList());
 
     OpenKeySession openKey = ozoneManagerClient.openKey(builder.build());
@@ -1797,7 +1798,8 @@ public class RpcClient implements ClientProtocol {
 
   private OpenKeySession newMultipartOpenKey(
       String volumeName, String bucketName, String keyName,
-      long size, int partNumber, String uploadID) throws IOException {
+      long size, int partNumber, String uploadID,
+      boolean sortDatanodesInPipeline) throws IOException {
     verifyVolumeName(volumeName);
     verifyBucketName(bucketName);
     if (checkKeyNameEnabled) {
@@ -1819,6 +1821,7 @@ public class RpcClient implements ClientProtocol {
         .setIsMultipartKey(true)
         .setMultipartUploadID(uploadID)
         .setMultipartUploadPartNumber(partNumber)
+        .setSortDatanodesInPipeline(sortDatanodesInPipeline)
         .setAcls(getAclList())
         .build();
     return ozoneManagerClient.openKey(keyArgs);
@@ -1829,7 +1832,7 @@ public class RpcClient implements ClientProtocol {
       String volumeName, String bucketName, String keyName,
       long size, int partNumber, String uploadID) throws IOException {
     final OpenKeySession openKey = newMultipartOpenKey(
-        volumeName, bucketName, keyName, size, partNumber, uploadID);
+        volumeName, bucketName, keyName, size, partNumber, uploadID, false);
     KeyOutputStream keyOutputStream = createKeyOutputStream(openKey)
         .setMultipartNumber(partNumber)
         .setMultipartUploadID(uploadID)
@@ -1849,7 +1852,7 @@ public class RpcClient implements ClientProtocol {
       String uploadID)
       throws IOException {
     final OpenKeySession openKey = newMultipartOpenKey(
-        volumeName, bucketName, keyName, size, partNumber, uploadID);
+        volumeName, bucketName, keyName, size, partNumber, uploadID, true);
     // Amazon S3 never adds partial objects, So for S3 requests we need to
     // set atomicKeyCreation to true
     // refer: 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
@@ -2099,6 +2102,7 @@ public class RpcClient implements ClientProtocol {
         .setReplicationConfig(replicationConfig)
         .setAcls(getAclList())
         .setLatestVersionLocation(getLatestVersionLocation)
+        .setSortDatanodesInPipeline(true)
         .build();
     OpenKeySession keySession =
         ozoneManagerClient.createFile(keyArgs, overWrite, recursive);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to