HDFS-10303. DataStreamer#ResponseProcessor calculates packet ack latency 
incorrectly. Contributed by Surendra Singh Lilhore.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4a5819da
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4a5819da
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4a5819da

Branch: refs/heads/YARN-3368
Commit: 4a5819dae2b0ca8f8b6d94ef464882d079d86593
Parents: 08ea07f
Author: Kihwal Lee <kih...@apache.org>
Authored: Tue May 17 08:53:31 2016 -0500
Committer: Kihwal Lee <kih...@apache.org>
Committed: Tue May 17 08:53:31 2016 -0500

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DataStreamer.java    | 28 +++++--
 .../org/apache/hadoop/hdfs/TestDataStream.java  | 85 ++++++++++++++++++++
 2 files changed, 105 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a5819da/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 4d5f16c..1b7306a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -32,10 +32,12 @@ import java.net.Socket;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -395,6 +397,7 @@ class DataStreamer extends Daemon {
   private volatile boolean appendChunk = false;
   // both dataQueue and ackQueue are protected by dataQueue lock
   protected final LinkedList<DFSPacket> dataQueue = new LinkedList<>();
+  private final Map<Long, Long> packetSendTime = new HashMap<>();
   private final LinkedList<DFSPacket> ackQueue = new LinkedList<>();
   private final AtomicReference<CachingStrategy> cachingStrategy;
   private final ByteArrayManager byteArrayManager;
@@ -644,6 +647,7 @@ class DataStreamer extends Daemon {
             scope = null;
             dataQueue.removeFirst();
             ackQueue.addLast(one);
+            packetSendTime.put(one.getSeqno(), Time.monotonicNow());
             dataQueue.notifyAll();
           }
         }
@@ -957,15 +961,21 @@ class DataStreamer extends Daemon {
         // process responses from datanodes.
         try {
           // read an ack from the pipeline
-          long begin = Time.monotonicNow();
           ack.readFields(blockReplyStream);
-          long duration = Time.monotonicNow() - begin;
-          if (duration > dfsclientSlowLogThresholdMs
-              && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
-            LOG.warn("Slow ReadProcessor read fields took " + duration
-                + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
-                + ack + ", targets: " + Arrays.asList(targets));
-          } else {
+          if (ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
+            Long begin = packetSendTime.get(ack.getSeqno());
+            if (begin != null) {
+              long duration = Time.monotonicNow() - begin;
+              if (duration > dfsclientSlowLogThresholdMs) {
+                LOG.info("Slow ReadProcessor read fields for block " + block
+                    + " took " + duration + "ms (threshold="
+                    + dfsclientSlowLogThresholdMs + "ms); ack: " + ack
+                    + ", targets: " + Arrays.asList(targets));
+              }
+            }
+          }
+
+          if (LOG.isDebugEnabled()) {
             LOG.debug("DFSClient {}", ack);
           }
 
@@ -1047,6 +1057,7 @@ class DataStreamer extends Daemon {
             lastAckedSeqno = seqno;
             pipelineRecoveryCount = 0;
             ackQueue.removeFirst();
+            packetSendTime.remove(seqno);
             dataQueue.notifyAll();
 
             one.releaseBuffer(byteArrayManager);
@@ -1105,6 +1116,7 @@ class DataStreamer extends Daemon {
     synchronized (dataQueue) {
       dataQueue.addAll(0, ackQueue);
       ackQueue.clear();
+      packetSendTime.clear();
     }
 
     // If we had to recover the pipeline five times in a row for the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a5819da/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataStream.java
new file mode 100644
index 0000000..3351b68
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataStream.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestDataStream {
+  static MiniDFSCluster cluster;
+  static int PACKET_SIZE = 1024;
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
+        PACKET_SIZE);
+    conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
+        10000);
+    conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 60000);
+    cluster = new MiniDFSCluster.Builder(conf).build();
+  }
+
+  @Test(timeout = 60000)
+  public void testDfsClient() throws IOException, InterruptedException {
+    LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(LogFactory
+        .getLog(DataStreamer.class));
+    byte[] toWrite = new byte[PACKET_SIZE];
+    new Random(1).nextBytes(toWrite);
+    final Path path = new Path("/file1");
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    FSDataOutputStream out = null;
+    out = dfs.create(path, false);
+    
+    out.write(toWrite);
+    out.write(toWrite);
+    out.hflush();
+    
+    //Wait to cross slow IO warning threshold
+    Thread.sleep(15 * 1000);
+    
+    out.write(toWrite);
+    out.write(toWrite);
+    out.hflush();
+    
+    //Wait for capturing logs in busy cluster
+    Thread.sleep(5 * 1000);
+
+    out.close();
+    logs.stopCapturing();
+    GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
+        "Slow ReadProcessor read fields for block");
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.shutdown();
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to