HDFS-10303. DataStreamer#ResponseProcessor calculates packet ack latency incorrectly. Contributed by Surendra Singh Lilhore.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4a5819da Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4a5819da Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4a5819da Branch: refs/heads/YARN-3368 Commit: 4a5819dae2b0ca8f8b6d94ef464882d079d86593 Parents: 08ea07f Author: Kihwal Lee <kih...@apache.org> Authored: Tue May 17 08:53:31 2016 -0500 Committer: Kihwal Lee <kih...@apache.org> Committed: Tue May 17 08:53:31 2016 -0500 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DataStreamer.java | 28 +++++-- .../org/apache/hadoop/hdfs/TestDataStream.java | 85 ++++++++++++++++++++ 2 files changed, 105 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a5819da/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 4d5f16c..1b7306a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -32,10 +32,12 @@ import java.net.Socket; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.EnumSet; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -395,6 +397,7 @@ class DataStreamer extends Daemon { private volatile boolean appendChunk = false; // both dataQueue and ackQueue are protected by dataQueue lock protected final LinkedList<DFSPacket> dataQueue = new LinkedList<>(); + private final Map<Long, Long> packetSendTime = new HashMap<>(); private final LinkedList<DFSPacket> ackQueue = new LinkedList<>(); private final AtomicReference<CachingStrategy> cachingStrategy; private final ByteArrayManager byteArrayManager; @@ -644,6 +647,7 @@ class DataStreamer extends Daemon { scope = null; dataQueue.removeFirst(); ackQueue.addLast(one); + packetSendTime.put(one.getSeqno(), Time.monotonicNow()); dataQueue.notifyAll(); } } @@ -957,15 +961,21 @@ class DataStreamer extends Daemon { // process responses from datanodes. try { // read an ack from the pipeline - long begin = Time.monotonicNow(); ack.readFields(blockReplyStream); - long duration = Time.monotonicNow() - begin; - if (duration > dfsclientSlowLogThresholdMs - && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) { - LOG.warn("Slow ReadProcessor read fields took " + duration - + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: " - + ack + ", targets: " + Arrays.asList(targets)); - } else { + if (ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) { + Long begin = packetSendTime.get(ack.getSeqno()); + if (begin != null) { + long duration = Time.monotonicNow() - begin; + if (duration > dfsclientSlowLogThresholdMs) { + LOG.info("Slow ReadProcessor read fields for block " + block + + " took " + duration + "ms (threshold=" + + dfsclientSlowLogThresholdMs + "ms); ack: " + ack + + ", targets: " + Arrays.asList(targets)); + } + } + } + + if (LOG.isDebugEnabled()) { LOG.debug("DFSClient {}", ack); } @@ -1047,6 +1057,7 @@ class DataStreamer extends Daemon { lastAckedSeqno = seqno; pipelineRecoveryCount = 0; ackQueue.removeFirst(); + packetSendTime.remove(seqno); dataQueue.notifyAll(); one.releaseBuffer(byteArrayManager); @@ -1105,6 +1116,7 @@ class DataStreamer extends Daemon { synchronized (dataQueue) { dataQueue.addAll(0, ackQueue); ackQueue.clear(); + packetSendTime.clear(); } // If we had to recover the pipeline five times in a row for the http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a5819da/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataStream.java new file mode 100644 index 0000000..3351b68 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataStream.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.util.Random; + +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.GenericTestUtils.LogCapturer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestDataStream { + static MiniDFSCluster cluster; + static int PACKET_SIZE = 1024; + + @BeforeClass + public static void setup() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, + PACKET_SIZE); + conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY, + 10000); + conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 60000); + cluster = new MiniDFSCluster.Builder(conf).build(); + } + + @Test(timeout = 60000) + public void testDfsClient() throws IOException, InterruptedException { + LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(LogFactory + .getLog(DataStreamer.class)); + byte[] toWrite = new byte[PACKET_SIZE]; + new Random(1).nextBytes(toWrite); + final Path path = new Path("/file1"); + final DistributedFileSystem dfs = cluster.getFileSystem(); + FSDataOutputStream out = null; + out = dfs.create(path, false); + + out.write(toWrite); + out.write(toWrite); + out.hflush(); + + //Wait to cross slow IO warning threshold + Thread.sleep(15 * 1000); + + out.write(toWrite); + out.write(toWrite); + out.hflush(); + + //Wait for capturing logs in busy cluster + Thread.sleep(5 * 1000); + + out.close(); + logs.stopCapturing(); + GenericTestUtils.assertDoesNotMatch(logs.getOutput(), + "Slow ReadProcessor read fields for block"); + } + + @AfterClass + public static void tearDown() { + cluster.shutdown(); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org