This is an automated email from the ASF dual-hosted git repository.
tasanuma pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new f6b3361 HDFS-16293. Client sleeps and holds 'dataQueue' when
DataNodes are congested. Contributed by Yuanxin Zhu.
f6b3361 is described below
commit f6b3361b4bc4f7168f3a5e21d16057f36d7088eb
Author: Takanobu Asanuma <[email protected]>
AuthorDate: Mon Dec 6 10:44:36 2021 +0900
HDFS-16293. Client sleeps and holds 'dataQueue' when DataNodes are
congested. Contributed by Yuanxin Zhu.
(cherry picked from commit e8e69de106c03d041a0d280ea727e3f252460163)
---
.../java/org/apache/hadoop/hdfs/DataStreamer.java | 13 ++--
.../apache/hadoop/hdfs/TestDFSOutputStream.java | 81 ++++++++++++++++++++++
2 files changed, 89 insertions(+), 5 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 772eb66..358a485 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -685,11 +685,6 @@ class DataStreamer extends Daemon {
continue;
}
// get packet to be sent.
- try {
- backOffIfNecessary();
- } catch (InterruptedException e) {
- LOG.debug("Thread interrupted", e);
- }
one = dataQueue.getFirst(); // regular data packet
SpanContext[] parents = one.getTraceParents();
if (parents != null && parents.length > 0) {
@@ -702,6 +697,14 @@ class DataStreamer extends Daemon {
}
}
+ // The DataStreamer has to release the dataQueue before sleeping,
+ // otherwise it will cause the ResponseProcessor to accept the ACK
delay.
+ try {
+ backOffIfNecessary();
+ } catch (InterruptedException e) {
+ LOG.debug("Thread interrupted", e);
+ }
+
// get new block from namenode.
LOG.debug("stage={}, {}", stage, this);
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
index 8854ec8..54c292b 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
@@ -30,6 +30,7 @@ import java.util.EnumSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
@@ -296,6 +297,86 @@ public class TestDFSOutputStream {
Assert.assertTrue(congestedNodes.isEmpty());
}
+ @Test(timeout=60000)
+ public void testCongestionAckDelay() {
+ DfsClientConf dfsClientConf = mock(DfsClientConf.class);
+ DFSClient client = mock(DFSClient.class);
+ when(client.getConf()).thenReturn(dfsClientConf);
+ when(client.getTracer()).thenReturn(FsTracer.get(new Configuration()));
+ client.clientRunning = true;
+ DataStreamer stream = new DataStreamer(
+ mock(HdfsFileStatus.class),
+ mock(ExtendedBlock.class),
+ client,
+ "foo", null, null, null, null, null, null);
+ DataOutputStream blockStream = mock(DataOutputStream.class);
+ Whitebox.setInternalState(stream, "blockStream", blockStream);
+ Whitebox.setInternalState(stream, "stage",
+ BlockConstructionStage.PIPELINE_CLOSE);
+ @SuppressWarnings("unchecked")
+ LinkedList<DFSPacket> dataQueue = (LinkedList<DFSPacket>)
+ Whitebox.getInternalState(stream, "dataQueue");
+ @SuppressWarnings("unchecked")
+ ArrayList<DatanodeInfo> congestedNodes = (ArrayList<DatanodeInfo>)
+ Whitebox.getInternalState(stream, "congestedNodes");
+ int backOffMaxTime = (int)
+ Whitebox.getInternalState(stream,
"CONGESTION_BACK_OFF_MAX_TIME_IN_MS");
+ DFSPacket[] packet = new DFSPacket[100];
+ AtomicBoolean isDelay = new AtomicBoolean(true);
+
+ // ResponseProcessor needs the dataQueue for the next step.
+ new Thread(() -> {
+ for (int i = 0; i < 10; i++) {
+ // In order to ensure that other threads run for a period of time to
prevent affecting
+ // the results.
+ try {
+ Thread.sleep(backOffMaxTime / 50);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ synchronized (dataQueue) {
+ congestedNodes.add(mock(DatanodeInfo.class));
+ // The DataStreamer releases the dataQueue before sleeping, and the
ResponseProcessor
+ // has time to hold the dataQueue to continuously accept ACKs and
add congestedNodes
+ // to the list. Therefore, congestedNodes.size() is greater than 1.
+ if (congestedNodes.size() > 1){
+ isDelay.set(false);
+ try {
+ doThrow(new IOException()).when(blockStream).flush();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ try {
+ doThrow(new IOException()).when(blockStream).flush();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ // Prevent the DataStreamer from always waiting because the
+ // dataQueue may be empty, so that the unit test cannot exit.
+ DFSPacket endPacket = mock(DFSPacket.class);
+ dataQueue.add(endPacket);
+ }).start();
+
+ // The purpose of adding packets to the dataQueue is to make the
DataStreamer run
+ // normally and judge whether to enter the sleep state according to the
congestion.
+ new Thread(() -> {
+ for (int i = 0; i < 100; i++) {
+ packet[i] = mock(DFSPacket.class);
+ dataQueue.add(packet[i]);
+ try {
+ Thread.sleep(backOffMaxTime / 100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }).start();
+ stream.run();
+ Assert.assertFalse(isDelay.get());
+ }
+
@Test
public void testNoLocalWriteFlag() throws IOException {
DistributedFileSystem fs = cluster.getFileSystem();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]