This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 023bf5f7ba HDDS-9392. Streaming write pipeline should pick the nearest
datanode as primary node (#5414)
023bf5f7ba is described below
commit 023bf5f7ba6a02d9ba955f89bb67f88ee27c5cbd
Author: Ivan Andika <[email protected]>
AuthorDate: Sun Dec 10 09:39:48 2023 +0800
HDDS-9392. Streaming write pipeline should pick the nearest datanode as
primary node (#5414)
---
.../hadoop/hdds/scm/XceiverClientManager.java | 27 +++++++++++++++-------
.../apache/hadoop/hdds/scm/XceiverClientRatis.java | 3 +--
.../hdds/scm/storage/BlockDataStreamOutput.java | 9 +++++---
.../org/apache/hadoop/hdds/ratis/RatisHelper.java | 20 ++++++++++++----
...lockLocationProtocolClientSideTranslatorPB.java | 2 ++
.../client/io/BlockDataStreamOutputEntryPool.java | 3 ++-
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 10 +++++---
7 files changed, 52 insertions(+), 22 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index c0a8b7421c..6b3e6cdbf4 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -274,16 +274,27 @@ public class XceiverClientManager implements Closeable,
XceiverClientFactory {
private String getPipelineCacheKey(Pipeline pipeline,
boolean topologyAware) {
String key = pipeline.getId().getId().toString() + pipeline.getType();
- boolean isEC = pipeline.getReplicationConfig()
- .getReplicationType() == HddsProtos.ReplicationType.EC;
+ boolean isEC = pipeline.getType() == HddsProtos.ReplicationType.EC;
if (topologyAware || isEC) {
try {
- key += pipeline.getClosestNode().getHostName();
- if (isEC) {
- // Currently EC uses standalone client.
- key += pipeline.getClosestNode()
- .getPort(DatanodeDetails.Port.Name.STANDALONE);
- }
+ DatanodeDetails closestNode = pipeline.getClosestNode();
+ // Pipeline cache key uses host:port suffix to handle
+ // both EC, Ratis, and Standalone client.
+ //
+ // For EC client, the host:port cache key is needed
+ // so that there is a different cache key for each node in
+ // a block group.
+ //
+ // For Ratis and Standalone client, the host:port cache key is needed
+ // to handle the possibility of two datanodes sharing the same machine.
+ // While normally one machine only hosts one datanode service,
+ // this situation might arise in tests.
+ //
+ // Standalone port is chosen since all datanodes should have a
+ // standalone port regardless of version and this port should not
+ // have any collisions.
+ key += closestNode.getHostName() + closestNode.getPort(
+ DatanodeDetails.Port.Name.STANDALONE);
} catch (IOException e) {
LOG.error("Failed to get closest node to create pipeline cache key:" +
e.getMessage());
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 7be6214b32..72ed30b37f 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -182,8 +182,7 @@ public final class XceiverClientRatis extends
XceiverClientSpi {
LOG.debug("Connecting to pipeline:{} leaderDatanode:{}, " +
"primaryDatanode:{}", getPipeline().getId(),
RatisHelper.toRaftPeerId(pipeline.getLeaderNode()),
- // TODO (HDDS-9392): Update this to getClosestNode
- RatisHelper.toRaftPeerId(pipeline.getFirstNode()));
+ RatisHelper.toRaftPeerId(pipeline.getClosestNode()));
}
if (!client.compareAndSet(null,
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 95f37eecb2..58bea6a99b 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -166,7 +166,7 @@ public class BlockDataStreamOutput implements
ByteBufferStreamOutput {
BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
.addMetadata(keyValue);
this.xceiverClient =
- (XceiverClientRatis)xceiverClientManager.acquireClient(pipeline);
+ (XceiverClientRatis)xceiverClientManager.acquireClient(pipeline, true);
this.token = token;
// Alternatively, stream setup can be delayed till the first chunk write.
this.out = setupStream(pipeline);
@@ -198,7 +198,9 @@ public class BlockDataStreamOutput implements
ByteBufferStreamOutput {
ContainerProtos.WriteChunkRequestProto.newBuilder()
.setBlockID(blockID.get().getDatanodeBlockIDProtobuf());
- String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
+ // TODO: The datanode UUID is not used meaningfully, consider deprecating
+ // it or remove it completely if possible
+ String id = pipeline.getFirstNode().getUuidString();
ContainerProtos.ContainerCommandRequestProto.Builder builder =
ContainerProtos.ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.StreamInit)
@@ -607,7 +609,8 @@ public class BlockDataStreamOutput implements
ByteBufferStreamOutput {
public void cleanup(boolean invalidateClient) {
if (xceiverClientFactory != null) {
- xceiverClientFactory.releaseClient(xceiverClient, invalidateClient);
+ xceiverClientFactory.releaseClient(xceiverClient, invalidateClient,
+ true);
}
xceiverClientFactory = null;
xceiverClient = null;
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
index 3f92ec634a..e543c39c95 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
@@ -203,13 +203,23 @@ public final class RatisHelper {
toRaftPeers(pipeline));
}
+ /**
+ * Create a Raft client used primarily for Ozone client communications with
+ * the Ratis pipeline.
+ * @param rpcType rpc type
+ * @param pipeline pipeline
+ * @param retryPolicy retry policy
+ * @param tlsConfig tls config
+ * @param ozoneConfiguration configuration
+ * @return Raft client
+ * @throws IOException IOException
+ */
public static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline,
RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
ConfigurationSource ozoneConfiguration) throws IOException {
return newRaftClient(rpcType,
toRaftPeerId(pipeline.getLeaderNode()),
- // TODO (HDDS-9392): Update this to getClosestNode
- toRaftPeer(pipeline.getFirstNode()),
+ toRaftPeer(pipeline.getClosestNode()),
newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()),
pipeline.getNodes()), retryPolicy, tlsConfig, ozoneConfiguration);
}
@@ -448,14 +458,14 @@ public final class RatisHelper {
RaftPeerId primaryId = null;
List<RaftPeerId> raftPeers = new ArrayList<>();
- for (DatanodeDetails dn : pipeline.getNodes()) {
+ for (DatanodeDetails dn : pipeline.getNodesInOrder()) {
final RaftPeerId raftPeerId = RaftPeerId.valueOf(dn.getUuidString());
try {
- if (dn == pipeline.getFirstNode()) {
+ if (dn == pipeline.getClosestNode()) {
primaryId = raftPeerId;
}
} catch (IOException e) {
- LOG.error("Can not get FirstNode from the pipeline: {} with " +
+ LOG.error("Can not get primary node from the pipeline: {} with " +
"exception: {}", pipeline, e.getLocalizedMessage());
return null;
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
index dd59972ca2..b072c3690b 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -142,6 +142,8 @@ public final class
ScmBlockLocationProtocolClientSideTranslatorPB
* @param num - number of blocks.
* @param replicationConfig - replication configuration of the blocks.
* @param excludeList - exclude list while allocating blocks.
+ * @param clientMachine - client address, depends, can be hostname or
+ * ipaddress.
* @return allocated block accessing info (key, pipeline).
* @throws IOException
*/
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
index 8b8e8c474e..8e80b38104 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
@@ -82,7 +82,8 @@ public class BlockDataStreamOutputEntryPool implements
KeyMetadataAware {
.setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
.setReplicationConfig(replicationConfig).setDataSize(info.getDataSize())
.setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID)
- .setMultipartUploadPartNumber(partNumber).build();
+ .setMultipartUploadPartNumber(partNumber)
+ .setSortDatanodesInPipeline(true).build();
this.requestID = requestId;
this.openID = openID;
this.excludeList = createExcludeList();
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index c85e2ed389..6aed19fb9d 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -1401,6 +1401,7 @@ public class RpcClient implements ClientProtocol {
.setDataSize(size)
.setReplicationConfig(replicationConfig)
.addAllMetadataGdpr(metadata)
+ .setSortDatanodesInPipeline(true)
.setAcls(getAclList());
OpenKeySession openKey = ozoneManagerClient.openKey(builder.build());
@@ -1797,7 +1798,8 @@ public class RpcClient implements ClientProtocol {
private OpenKeySession newMultipartOpenKey(
String volumeName, String bucketName, String keyName,
- long size, int partNumber, String uploadID) throws IOException {
+ long size, int partNumber, String uploadID,
+ boolean sortDatanodesInPipeline) throws IOException {
verifyVolumeName(volumeName);
verifyBucketName(bucketName);
if (checkKeyNameEnabled) {
@@ -1819,6 +1821,7 @@ public class RpcClient implements ClientProtocol {
.setIsMultipartKey(true)
.setMultipartUploadID(uploadID)
.setMultipartUploadPartNumber(partNumber)
+ .setSortDatanodesInPipeline(sortDatanodesInPipeline)
.setAcls(getAclList())
.build();
return ozoneManagerClient.openKey(keyArgs);
@@ -1829,7 +1832,7 @@ public class RpcClient implements ClientProtocol {
String volumeName, String bucketName, String keyName,
long size, int partNumber, String uploadID) throws IOException {
final OpenKeySession openKey = newMultipartOpenKey(
- volumeName, bucketName, keyName, size, partNumber, uploadID);
+ volumeName, bucketName, keyName, size, partNumber, uploadID, false);
KeyOutputStream keyOutputStream = createKeyOutputStream(openKey)
.setMultipartNumber(partNumber)
.setMultipartUploadID(uploadID)
@@ -1849,7 +1852,7 @@ public class RpcClient implements ClientProtocol {
String uploadID)
throws IOException {
final OpenKeySession openKey = newMultipartOpenKey(
- volumeName, bucketName, keyName, size, partNumber, uploadID);
+ volumeName, bucketName, keyName, size, partNumber, uploadID, true);
// Amazon S3 never adds partial objects, So for S3 requests we need to
// set atomicKeyCreation to true
// refer:
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
@@ -2099,6 +2102,7 @@ public class RpcClient implements ClientProtocol {
.setReplicationConfig(replicationConfig)
.setAcls(getAclList())
.setLatestVersionLocation(getLatestVersionLocation)
+ .setSortDatanodesInPipeline(true)
.build();
OpenKeySession keySession =
ozoneManagerClient.createFile(keyArgs, overWrite, recursive);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]