This is an automated email from the ASF dual-hosted git repository.
shahrs87 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 126322024d04 HDFS-17299. Adding rack failure tolerance when creating a
new file (#6612)
126322024d04 is described below
commit 126322024d04d406fc219eadcd704aba7ad7d3b0
Author: ritegarg <58840065+riteg...@users.noreply.github.com>
AuthorDate: Thu Mar 14 15:31:53 2024 -0700
HDFS-17299. Adding rack failure tolerance when creating a new file (#6612)
---
.../java/org/apache/hadoop/hdfs/DataStreamer.java | 69 +++
.../apache/hadoop/hdfs/StripedDataStreamer.java| 12 +-
.../hadoop/hdfs/server/datanode/BlockReceiver.java | 5 +-
.../server/datanode/fsdataset/FsDatasetSpi.java| 12 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 21 +++-
.../hadoop/hdfs/TestDistributedFileSystem.java | 131 -
.../hdfs/server/datanode/SimulatedFSDataset.java | 6 +
.../datanode/extdataset/ExternalDatasetImpl.java | 6 +
8 files changed, 228 insertions(+), 34 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index ccc2dfe20688..120083ab671e 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -85,6 +85,7 @@ import
org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
import org.apache.hadoop.thirdparty.com.google.common.cache.RemovalListener;
import
org.apache.hadoop.thirdparty.com.google.common.cache.RemovalNotification;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -608,17 +609,17 @@ class DataStreamer extends Daemon {
this.accessToken = t;
}
- private void setPipeline(LocatedBlock lb) {
+ protected void setPipeline(LocatedBlock lb) {
setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
}
- private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
- String[] storageIDs) {
+ protected void setPipeline(DatanodeInfo[] newNodes, StorageType[]
newStorageTypes,
+ String[] newStorageIDs) {
synchronized (nodesLock) {
- this.nodes = nodes;
+ this.nodes = newNodes;
}
-this.storageTypes = storageTypes;
-this.storageIDs = storageIDs;
+this.storageTypes = newStorageTypes;
+this.storageIDs = newStorageIDs;
}
/**
@@ -713,7 +714,7 @@ class DataStreamer extends Daemon {
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
LOG.debug("Allocating new block: {}", this);
- setPipeline(nextBlockOutputStream());
+ setupPipelineForCreate();
initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
LOG.debug("Append to block {}", block);
@@ -1519,8 +1520,11 @@ class DataStreamer extends Daemon {
* it can be written to.
* This happens when a file is appended or data streaming fails
* It keeps on trying until a pipeline is setup
+ *
+ * Returns boolean whether pipeline was setup successfully or not.
+ * This boolean is used upstream on whether to continue creating pipeline or
throw exception
*/
- private void setupPipelineForAppendOrRecovery() throws IOException {
+ private boolean setupPipelineForAppendOrRecovery() throws IOException {
// Check number of datanodes. Note that if there is no healthy datanode,
// this must be internal error because we mark external error in striped
// outputstream only when all the streamers are in the DATA_STREAMING stage
@@ -1530,33 +1534,46 @@ class DataStreamer extends Daemon {
LOG.warn(msg);
lastException.set(new IOException(msg));
streamerClosed = true;
- return;
+ return false;
}
-setupPipelineInternal(nodes, storageTypes, storageIDs);
+return setupPipelineInternal(nodes, storageTypes, storageIDs);
}
- protected void setupPipelineInternal(DatanodeInfo[] datanodes,
+ protected boolean setupPipelineInternal(DatanodeInfo[] datanodes,
StorageType[] nodeStorageTypes, String[] nodeStorageIDs)
throws IOException {
boolean success = false;
long newGS = 0L;
+boolean isCreateStage = BlockConstructionStage.PIPELINE_SETUP_CREATE ==
stage;
while (!success && !streamerClosed && dfsClient.clientRunning) {
if (!handleRestartingDatanode()) {
-return;
+return false;
}
- final boolean