Author: arp
Date: Tue Feb 25 02:16:29 2014
New Revision: 1571542

URL: http://svn.apache.org/r1571542
Log:
HDFS-5922. DN heartbeat thread can get stuck in tight loop. (Arpit Agarwal)

Added:
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1571542&r1=1571541&r2=1571542&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Feb 25 
02:16:29 2014
@@ -613,6 +613,8 @@ Release 2.4.0 - UNRELEASED
     HDFS-5981. PBImageXmlWriter generates malformed XML.
     (Haohui Mai via cnauroth)
 
+    HDFS-5922. DN heartbeat thread can get stuck in tight loop. (Arpit Agarwal)
+
 Release 2.3.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1571542&r1=1571541&r2=1571542&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 Tue Feb 25 02:16:29 2014
@@ -1108,6 +1108,7 @@ public class DFSOutputStream extends FSO
             excluded.length > 0 ? excluded : null);
         block = lb.getBlock();
         block.setNumBytes(0);
+        bytesSent = 0;
         accessToken = lb.getBlockToken();
         nodes = lb.getLocations();
 

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1571542&r1=1571541&r2=1571542&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
 Tue Feb 25 02:16:29 2014
@@ -101,7 +101,10 @@ class BPServiceActor implements Runnable
   private final Map<String, PerStoragePendingIncrementalBR>
       pendingIncrementalBRperStorage = Maps.newHashMap();
 
-  private volatile int pendingReceivedRequests = 0;
+  // IBR = Incremental Block Report. If this flag is set then an IBR will be
+  // sent immediately by the actor thread without waiting for the IBR timer
+  // to elapse.
+  private volatile boolean sendImmediateIBR = false;
   private volatile boolean shouldServiceRun = true;
   private final DataNode dn;
   private final DNConf dnConf;
@@ -283,12 +286,10 @@ class BPServiceActor implements Runnable
         if (perStorageMap.getBlockInfoCount() > 0) {
           // Send newly-received and deleted blockids to namenode
           ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
-          pendingReceivedRequests =
-              (pendingReceivedRequests > rdbi.length ?
-                  (pendingReceivedRequests - rdbi.length) : 0);
           reports.add(new StorageReceivedDeletedBlocks(storageUuid, rdbi));
         }
       }
+      sendImmediateIBR = false;
     }
 
     if (reports.size() == 0) {
@@ -312,8 +313,8 @@ class BPServiceActor implements Runnable
             // didn't put something newer in the meantime.
             PerStoragePendingIncrementalBR perStorageMap =
                 pendingIncrementalBRperStorage.get(report.getStorageID());
-            pendingReceivedRequests +=
-                perStorageMap.putMissingBlockInfos(report.getBlocks());
+            perStorageMap.putMissingBlockInfos(report.getBlocks());
+            sendImmediateIBR = true;
           }
         }
       }
@@ -371,7 +372,7 @@ class BPServiceActor implements Runnable
       ReceivedDeletedBlockInfo bInfo, String storageUuid) {
     synchronized (pendingIncrementalBRperStorage) {
       addPendingReplicationBlockInfo(bInfo, storageUuid);
-      pendingReceivedRequests++;
+      sendImmediateIBR = true;
       pendingIncrementalBRperStorage.notifyAll();
     }
   }
@@ -433,6 +434,11 @@ class BPServiceActor implements Runnable
     }
   }
 
+  @VisibleForTesting
+  boolean hasPendingIBR() {
+    return sendImmediateIBR;
+  }
+
   /**
    * Report the list blocks to the Namenode
    * @return DatanodeCommands returned by the NN. May be null.
@@ -676,8 +682,8 @@ class BPServiceActor implements Runnable
             }
           }
         }
-        if (pendingReceivedRequests > 0
-            || (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
+        if (sendImmediateIBR ||
+            (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
           reportReceivedDeletedBlocks();
           lastDeletedReport = startTime;
         }
@@ -701,7 +707,7 @@ class BPServiceActor implements Runnable
         long waitTime = dnConf.heartBeatInterval - 
         (Time.now() - lastHeartbeat);
         synchronized(pendingIncrementalBRperStorage) {
-          if (waitTime > 0 && pendingReceivedRequests == 0) {
+          if (waitTime > 0 && !sendImmediateIBR) {
             try {
               pendingIncrementalBRperStorage.wait(waitTime);
             } catch (InterruptedException ie) {

Added: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java?rev=1571542&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java
 (added)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java
 Tue Feb 25 02:16:29 2014
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static junit.framework.Assert.assertFalse;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.protocol.Block;
+import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import 
org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
+
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Verify that incremental block reports are generated in response to
+ * block additions/deletions.
+ */
+public class TestIncrementalBlockReports {
+  public static final Log LOG = 
LogFactory.getLog(TestIncrementalBlockReports.class);
+
+  private static final short DN_COUNT = 1;
+  private static final long DUMMY_BLOCK_ID = 5678;
+  private static final long DUMMY_BLOCK_LENGTH = 1024 * 1024;
+  private static final long DUMMY_BLOCK_GENSTAMP = 1000;
+
+  private MiniDFSCluster cluster = null;
+  private DistributedFileSystem fs;
+  private Configuration conf;
+  private NameNode singletonNn;
+  private DataNode singletonDn;
+  private BPOfferService bpos;    // BPOS to use for block injection.
+  private BPServiceActor actor;   // BPSA to use for block injection.
+  private String storageUuid;     // DatanodeStorage to use for block 
injection.
+
+  @Before
+  public void startCluster() throws IOException {
+    conf = new HdfsConfiguration();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DN_COUNT).build();
+    fs = cluster.getFileSystem();
+    singletonNn = cluster.getNameNode();
+    singletonDn = cluster.getDataNodes().get(0);
+    bpos = singletonDn.getAllBpOs()[0];
+    actor = bpos.getBPServiceActors().get(0);
+    storageUuid = 
singletonDn.getFSDataset().getVolumes().get(0).getStorageID();
+  }
+
+  private static Block getDummyBlock() {
+    return new Block(DUMMY_BLOCK_ID, DUMMY_BLOCK_LENGTH, DUMMY_BLOCK_GENSTAMP);
+  }
+
+  /**
+   * Inject a fake 'received' block into the BPServiceActor state.
+   */
+  private void injectBlockReceived() {
+    ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
+        getDummyBlock(), BlockStatus.RECEIVED_BLOCK, null);
+    actor.notifyNamenodeBlockImmediately(rdbi, storageUuid);
+  }
+
+  /**
+   * Inject a fake 'deleted' block into the BPServiceActor state.
+   */
+  private void injectBlockDeleted() {
+    ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
+        getDummyBlock(), BlockStatus.DELETED_BLOCK, null);
+    actor.notifyNamenodeDeletedBlock(rdbi, storageUuid);
+  }
+
+  /**
+   * Spy on calls from the DN to the NN.
+   * @return spy object that can be used for Mockito verification.
+   */
+  DatanodeProtocolClientSideTranslatorPB spyOnDnCallsToNn() {
+    return DataNodeTestUtils.spyOnBposToNN(singletonDn, singletonNn);
+  }
+
+  /**
+   * Ensure that an IBR is generated immediately for a block received by
+   * the DN.
+   *
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  @Test (timeout=60000)
+  public void testReportBlockReceived() throws InterruptedException, 
IOException {
+    try {
+      DatanodeProtocolClientSideTranslatorPB nnSpy = spyOnDnCallsToNn();
+      injectBlockReceived();
+
+      // Sleep for a very short time, this is necessary since the IBR is
+      // generated asynchronously.
+      Thread.sleep(2000);
+
+      // Ensure that the received block was reported immediately.
+      Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted(
+          any(DatanodeRegistration.class),
+          anyString(),
+          any(StorageReceivedDeletedBlocks[].class));
+    } finally {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  /**
+   * Ensure that a delayed IBR is generated for a block deleted on the DN.
+   *
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  @Test (timeout=60000)
+  public void testReportBlockDeleted() throws InterruptedException, 
IOException {
+    try {
+      // Trigger a block report to reset the IBR timer.
+      DataNodeTestUtils.triggerBlockReport(singletonDn);
+
+      // Spy on calls from the DN to the NN
+      DatanodeProtocolClientSideTranslatorPB nnSpy = spyOnDnCallsToNn();
+      injectBlockDeleted();
+
+      // Sleep for a very short time since IBR is generated
+      // asynchronously.
+      Thread.sleep(2000);
+
+      // Ensure that no block report was generated immediately.
+      // Deleted blocks are reported when the IBR timer elapses.
+      Mockito.verify(nnSpy, times(0)).blockReceivedAndDeleted(
+          any(DatanodeRegistration.class),
+          anyString(),
+          any(StorageReceivedDeletedBlocks[].class));
+
+      // Trigger a block report, this also triggers an IBR.
+      DataNodeTestUtils.triggerBlockReport(singletonDn);
+      Thread.sleep(2000);
+
+      // Ensure that the deleted block is reported.
+      Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted(
+          any(DatanodeRegistration.class),
+          anyString(),
+          any(StorageReceivedDeletedBlocks[].class));
+
+    } finally {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  /**
+   * Add a received block entry and then replace it. Ensure that a single
+   * IBR is generated and that pending receive request state is cleared.
+   * This test case verifies the failure in HDFS-5922.
+   *
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  @Test (timeout=60000)
+  public void testReplaceReceivedBlock() throws InterruptedException, 
IOException {
+    try {
+      // Spy on calls from the DN to the NN
+      DatanodeProtocolClientSideTranslatorPB nnSpy = spyOnDnCallsToNn();
+      injectBlockReceived();
+      injectBlockReceived();    // Overwrite the existing entry.
+
+      // Sleep for a very short time since IBR is generated
+      // asynchronously.
+      Thread.sleep(2000);
+
+      // Ensure that the received block is reported.
+      Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted(
+          any(DatanodeRegistration.class),
+          anyString(),
+          any(StorageReceivedDeletedBlocks[].class));
+
+      // Ensure that no more IBRs are pending.
+      assertFalse(actor.hasPendingIBR());
+
+    } finally {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+}


Reply via email to