Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 0a4d8f658 -> eecc68d4d


HDFS-9726. Refactor IBR code to a new class. Contributed by Tsz-Wo Nicholas 
Sze. Backport HDFS-11839 by Vinitha Reddy Gankidi.

(cherry picked from commit 494b6c7c4b5e0a861fcbdcfb89cfc4869247e710)

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/eecc68d4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/eecc68d4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/eecc68d4

Branch: refs/heads/branch-2.7
Commit: eecc68d4d8a3b8d29f972041748fa244691eeaa8
Parents: 0a4d8f6
Author: Konstantin V Shvachko <s...@apache.org>
Authored: Thu May 18 18:09:36 2017 -0700
Committer: Konstantin V Shvachko <s...@apache.org>
Committed: Thu May 18 18:09:36 2017 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/datanode/BPOfferService.java    |  45 ++-
 .../hdfs/server/datanode/BPServiceActor.java    | 282 +++----------------
 .../datanode/IncrementalBlockReportManager.java | 233 +++++++++++++++
 .../server/datanode/TestBPOfferService.java     |   2 +-
 .../datanode/TestIncrementalBlockReports.java   |  11 +-
 .../server/datanode/TestTriggerBlockReport.java |   5 +-
 7 files changed, 298 insertions(+), 283 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/eecc68d4/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 32a5069..941eeac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -91,6 +91,9 @@ Release 2.7.4 - UNRELEASED
     HDFS-7990. IBR delete ack should not be delayed. (Daryn Sharp.
     Backport HDFS-11838 by Vinitha Gankidi)
 
+    HDFS-9726. Refactor IBR code to a new class. (Tsz-Wo Nicholas Sze
+    Backport HDFS-11839 by Vinitha Reddy Gankidi)
+
   OPTIMIZATIONS
 
     HDFS-10896. Move lock logging logic from FSNamesystem into 
FSNamesystemLock.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eecc68d4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 53180ff..b756f39 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -232,14 +232,27 @@ class BPOfferService {
    */
   void notifyNamenodeReceivedBlock(
       ExtendedBlock block, String delHint, String storageUuid) {
+    notifyNamenodeBlock(block, BlockStatus.RECEIVED_BLOCK, delHint,
+        storageUuid);
+  }
+
+  void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
+    notifyNamenodeBlock(block, BlockStatus.RECEIVING_BLOCK, null, storageUuid);
+  }
+
+  void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
+    notifyNamenodeBlock(block, BlockStatus.DELETED_BLOCK, null, storageUuid);
+  }
+
+  private void notifyNamenodeBlock(ExtendedBlock block, BlockStatus status,
+      String delHint, String storageUuid) {
     checkBlock(block);
-    ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
-        block.getLocalBlock(),
-        ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
-        delHint);
+    final ReceivedDeletedBlockInfo info = new ReceivedDeletedBlockInfo(
+        block.getLocalBlock(), status, delHint);
+    final DatanodeStorage storage = dn.getFSDataset().getStorage(storageUuid);
 
     for (BPServiceActor actor : bpServices) {
-      actor.notifyNamenodeBlock(bInfo, storageUuid, true);
+      actor.getIbrManager().notifyNamenodeBlock(info, storage);
     }
   }
 
@@ -250,26 +263,6 @@ class BPOfferService {
         "block belongs to BP %s instead of BP %s",
         block.getBlockPoolId(), getBlockPoolId());
   }
-  
-  void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
-    checkBlock(block);
-    ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
-       block.getLocalBlock(), BlockStatus.DELETED_BLOCK, null);
-    
-    for (BPServiceActor actor : bpServices) {
-      actor.notifyNamenodeDeletedBlock(bInfo, storageUuid);
-    }
-  }
-  
-  void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
-    checkBlock(block);
-    ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
-       block.getLocalBlock(), BlockStatus.RECEIVING_BLOCK, null);
-    
-    for (BPServiceActor actor : bpServices) {
-      actor.notifyNamenodeBlock(bInfo, storageUuid, false);
-    }
-  }
 
   //This must be called only by blockPoolManager
   void start() {
@@ -576,7 +569,7 @@ class BPOfferService {
   @VisibleForTesting
   void triggerDeletionReportForTests() throws IOException {
     for (BPServiceActor actor : bpServices) {
-      actor.triggerDeletionReportForTests();
+      actor.getIbrManager().triggerDeletionReportForTests();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eecc68d4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 38b7e94..865481b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -29,7 +29,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.base.Joiner;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@@ -51,9 +50,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
-import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.io.IOUtils;
@@ -63,7 +60,7 @@ import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.VersionUtil;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
+import com.google.common.base.Joiner;
 
 /**
  * A thread per active or standby namenode to perform:
@@ -94,23 +91,13 @@ class BPServiceActor implements Runnable {
   }
 
   private volatile RunningState runningState = RunningState.CONNECTING;
-
-  /**
-   * Between block reports (which happen on the order of once an hour) the
-   * DN reports smaller incremental changes to its block list. This map,
-   * keyed by block ID, contains the pending changes which have yet to be
-   * reported to the NN. Access should be synchronized on this object.
-   */
-  private final Map<DatanodeStorage, PerStoragePendingIncrementalBR>
-      pendingIncrementalBRperStorage = Maps.newHashMap();
-
-  // IBR = Incremental Block Report. If this flag is set then an IBR will be
-  // sent immediately by the actor thread without waiting for the IBR timer
-  // to elapse.
-  private volatile boolean sendImmediateIBR = false;
   private volatile boolean shouldServiceRun = true;
   private final DataNode dn;
   private final DNConf dnConf;
+  private long prevBlockReportId = 0;
+
+  private final IncrementalBlockReportManager ibrManager
+      = new IncrementalBlockReportManager();
 
   private DatanodeRegistration bpRegistration;
   final LinkedList<BPServiceActorAction> bpThreadQueue 
@@ -124,6 +111,14 @@ class BPServiceActor implements Runnable {
     scheduler = new Scheduler(dnConf.heartBeatInterval, 
dnConf.blockReportInterval);
   }
 
+  public DatanodeRegistration getBpRegistration() {
+    return bpRegistration;
+  }
+
+  IncrementalBlockReportManager getIbrManager() {
+    return ibrManager;
+  }
+
   boolean isAlive() {
     if (!shouldServiceRun || !bpThread.isAlive()) {
       return false;
@@ -224,140 +219,19 @@ class BPServiceActor implements Runnable {
     register(nsInfo);
   }
 
-  /**
-   * Report received blocks and delete hints to the Namenode for each
-   * storage.
-   *
-   * @throws IOException
-   */
-  private void reportReceivedDeletedBlocks() throws IOException {
-
-    // Generate a list of the pending reports for each storage under the lock
-    ArrayList<StorageReceivedDeletedBlocks> reports =
-        new 
ArrayList<StorageReceivedDeletedBlocks>(pendingIncrementalBRperStorage.size());
-    synchronized (pendingIncrementalBRperStorage) {
-      for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
-           pendingIncrementalBRperStorage.entrySet()) {
-        final DatanodeStorage storage = entry.getKey();
-        final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
-
-        if (perStorageMap.getBlockInfoCount() > 0) {
-          // Send newly-received and deleted blockids to namenode
-          ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
-          reports.add(new StorageReceivedDeletedBlocks(storage, rdbi));
-        }
-      }
-      sendImmediateIBR = false;
-    }
-
-    if (reports.size() == 0) {
-      // Nothing new to report.
-      return;
-    }
-
-    // Send incremental block reports to the Namenode outside the lock
-    boolean success = false;
-    final long startTime = monotonicNow();
-    try {
-      bpNamenode.blockReceivedAndDeleted(bpRegistration,
-          bpos.getBlockPoolId(),
-          reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));
-      success = true;
-    } finally {
-      dn.getMetrics().addIncrementalBlockReport(monotonicNow() - startTime);
-      if (!success) {
-        synchronized (pendingIncrementalBRperStorage) {
-          for (StorageReceivedDeletedBlocks report : reports) {
-            // If we didn't succeed in sending the report, put all of the
-            // blocks back onto our queue, but only in the case where we
-            // didn't put something newer in the meantime.
-            PerStoragePendingIncrementalBR perStorageMap =
-                pendingIncrementalBRperStorage.get(report.getStorage());
-            perStorageMap.putMissingBlockInfos(report.getBlocks());
-            sendImmediateIBR = true;
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * @return pending incremental block report for given {@code storage}
-   */
-  private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage(
-      DatanodeStorage storage) {
-    PerStoragePendingIncrementalBR mapForStorage =
-        pendingIncrementalBRperStorage.get(storage);
-
-    if (mapForStorage == null) {
-      // This is the first time we are adding incremental BR state for
-      // this storage so create a new map. This is required once per
-      // storage, per service actor.
-      mapForStorage = new PerStoragePendingIncrementalBR();
-      pendingIncrementalBRperStorage.put(storage, mapForStorage);
-    }
-
-    return mapForStorage;
-  }
-
-  /**
-   * Add a blockInfo for notification to NameNode. If another entry
-   * exists for the same block it is removed.
-   *
-   * Caller must synchronize access using pendingIncrementalBRperStorage.
-   */
-  void addPendingReplicationBlockInfo(ReceivedDeletedBlockInfo bInfo,
-      DatanodeStorage storage) {
-    // Make sure another entry for the same block is first removed.
-    // There may only be one such entry.
-    for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
-          pendingIncrementalBRperStorage.entrySet()) {
-      if (entry.getValue().removeBlockInfo(bInfo)) {
-        break;
-      }
-    }
-    getIncrementalBRMapForStorage(storage).putBlockInfo(bInfo);
-  }
-
-  /*
-   * Informing the name node could take a long long time! Should we wait
-   * till namenode is informed before responding with success to the
-   * client? For now we don't.
-   */
-  void notifyNamenodeBlock(ReceivedDeletedBlockInfo bInfo,
-      String storageUuid, boolean now) {
-    synchronized (pendingIncrementalBRperStorage) {
-      addPendingReplicationBlockInfo(
-          bInfo, dn.getFSDataset().getStorage(storageUuid));
-      sendImmediateIBR = true;
-      // If now is true, the report is sent right away.
-      // Otherwise, it will be sent out in the next heartbeat.
-      if (now) {
-        pendingIncrementalBRperStorage.notifyAll();
-      }
-    }
-  }
-
-  void notifyNamenodeDeletedBlock(
-      ReceivedDeletedBlockInfo bInfo, String storageUuid) {
-    synchronized (pendingIncrementalBRperStorage) {
-      addPendingReplicationBlockInfo(
-          bInfo, dn.getFSDataset().getStorage(storageUuid));
-    }
-  }
 
   /**
    * Run an immediate block report on this thread. Used by tests.
    */
   @VisibleForTesting
   void triggerBlockReportForTests() {
-    synchronized (pendingIncrementalBRperStorage) {
+    synchronized (ibrManager) {
       scheduler.scheduleHeartbeat();
       long nextBlockReportTime = scheduler.scheduleBlockReport(0);
-      pendingIncrementalBRperStorage.notifyAll();
+      ibrManager.notifyAll();
       while (nextBlockReportTime - scheduler.nextBlockReportTime >= 0) {
         try {
-          pendingIncrementalBRperStorage.wait(100);
+          ibrManager.wait(100);
         } catch (InterruptedException e) {
           return;
         }
@@ -367,12 +241,12 @@ class BPServiceActor implements Runnable {
   
   @VisibleForTesting
   void triggerHeartbeatForTests() {
-    synchronized (pendingIncrementalBRperStorage) {
+    synchronized (ibrManager) {
       final long nextHeartbeatTime = scheduler.scheduleHeartbeat();
-      pendingIncrementalBRperStorage.notifyAll();
+      ibrManager.notifyAll();
       while (nextHeartbeatTime - scheduler.nextHeartbeatTime >= 0) {
         try {
-          pendingIncrementalBRperStorage.wait(100);
+          ibrManager.wait(100);
         } catch (InterruptedException e) {
           return;
         }
@@ -380,29 +254,6 @@ class BPServiceActor implements Runnable {
     }
   }
 
-  @VisibleForTesting
-  void triggerDeletionReportForTests() {
-    synchronized (pendingIncrementalBRperStorage) {
-      sendImmediateIBR = true;
-      pendingIncrementalBRperStorage.notifyAll();
-
-      while (sendImmediateIBR) {
-        try {
-          pendingIncrementalBRperStorage.wait(100);
-        } catch (InterruptedException e) {
-          return;
-        }
-      }
-    }
-  }
-
-  @VisibleForTesting
-  boolean hasPendingIBR() {
-    return sendImmediateIBR;
-  }
-
-  private long prevBlockReportId = 0;
-
   private long generateUniqueBlockReportId() {
     long id = System.nanoTime();
     if (id <= prevBlockReportId) {
@@ -429,7 +280,8 @@ class BPServiceActor implements Runnable {
     // we have a chance that we will miss the delHint information
     // or we will report an RBW replica after the BlockReport already reports
     // a FINALIZED one.
-    reportReceivedDeletedBlocks();
+    ibrManager.sendIBRs(bpNamenode, bpRegistration,
+        bpos.getBlockPoolId(), dn.getMetrics());
 
     long brCreateStartTime = monotonicNow();
     Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
@@ -674,8 +526,9 @@ class BPServiceActor implements Runnable {
             }
           }
         }
-        if (sendImmediateIBR || sendHeartbeat) {
-          reportReceivedDeletedBlocks();
+        if (ibrManager.sendImmediately() || sendHeartbeat) {
+          ibrManager.sendIBRs(bpNamenode, bpRegistration,
+              bpos.getBlockPoolId(), dn.getMetrics());
         }
 
         List<DatanodeCommand> cmds = blockReport();
@@ -689,10 +542,10 @@ class BPServiceActor implements Runnable {
         // or work arrives, and then iterate again.
         //
         long waitTime = scheduler.getHeartbeatWaitTime();
-        synchronized(pendingIncrementalBRperStorage) {
-          if (waitTime > 0 && !sendImmediateIBR) {
+        synchronized(ibrManager) {
+          if (waitTime > 0 && !ibrManager.sendImmediately()) {
             try {
-              pendingIncrementalBRperStorage.wait(waitTime);
+              ibrManager.wait(waitTime);
             } catch (InterruptedException ie) {
               LOG.warn("BPOfferService for " + this + " interrupted");
             }
@@ -881,87 +734,20 @@ class BPServiceActor implements Runnable {
       // HDFS-9917,Standby NN IBR can be very huge if standby namenode is down
       // for sometime.
       if (state == HAServiceState.STANDBY) {
-        pendingIncrementalBRperStorage.clear();
-      }
-    }
-  }
-
-  private static class PerStoragePendingIncrementalBR {
-    private final Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR =
-        Maps.newHashMap();
-
-    /**
-     * Return the number of blocks on this storage that have pending
-     * incremental block reports.
-     * @return
-     */
-    int getBlockInfoCount() {
-      return pendingIncrementalBR.size();
-    }
-
-    /**
-     * Dequeue and return all pending incremental block report state.
-     * @return
-     */
-    ReceivedDeletedBlockInfo[] dequeueBlockInfos() {
-      ReceivedDeletedBlockInfo[] blockInfos =
-          pendingIncrementalBR.values().toArray(
-              new ReceivedDeletedBlockInfo[getBlockInfoCount()]);
-
-      pendingIncrementalBR.clear();
-      return blockInfos;
-    }
-
-    /**
-     * Add blocks from blockArray to pendingIncrementalBR, unless the
-     * block already exists in pendingIncrementalBR.
-     * @param blockArray list of blocks to add.
-     * @return the number of missing blocks that we added.
-     */
-    int putMissingBlockInfos(ReceivedDeletedBlockInfo[] blockArray) {
-      int blocksPut = 0;
-      for (ReceivedDeletedBlockInfo rdbi : blockArray) {
-        if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) {
-          pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
-          ++blocksPut;
-        }
+        ibrManager.clearIBRs();
       }
-      return blocksPut;
-    }
-
-    /**
-     * Add pending incremental block report for a single block.
-     * @param blockInfo
-     */
-    void putBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
-      pendingIncrementalBR.put(blockInfo.getBlock().getBlockId(), blockInfo);
-    }
-
-    /**
-     * Remove pending incremental block report for a single block if it
-     * exists.
-     *
-     * @param blockInfo
-     * @return true if a report was removed, false if no report existed for
-     *         the given block.
-     */
-    boolean removeBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
-      return (pendingIncrementalBR.remove(blockInfo.getBlock().getBlockId()) 
!= null);
     }
   }
 
-  void triggerBlockReport(BlockReportOptions options) throws IOException {
+  void triggerBlockReport(BlockReportOptions options) {
     if (options.isIncremental()) {
       LOG.info(bpos.toString() + ": scheduling an incremental block report.");
-      synchronized(pendingIncrementalBRperStorage) {
-        sendImmediateIBR = true;
-        pendingIncrementalBRperStorage.notifyAll();
-      }
+      ibrManager.triggerIBR();
     } else {
       LOG.info(bpos.toString() + ": scheduling a full block report.");
-      synchronized(pendingIncrementalBRperStorage) {
+      synchronized(ibrManager) {
         scheduler.scheduleBlockReport(0);
-        pendingIncrementalBRperStorage.notifyAll();
+        ibrManager.notifyAll();
       }
     }
   }
@@ -991,10 +777,6 @@ class BPServiceActor implements Runnable {
       }
     }
   }
-  @VisibleForTesting
-  int getPendingIBRSize() {
-    return pendingIncrementalBRperStorage.size();
-  }
 
   Scheduler getScheduler() {
     return scheduler;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eecc68d4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java
new file mode 100644
index 0000000..3ad59b9
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import 
org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+
+/**
+ * Manage Incremental Block Reports (IBRs).
+ */
+@InterfaceAudience.Private
+class IncrementalBlockReportManager {
+  private static class PerStorageIBR {
+    /** The blocks in this IBR. */
+    final Map<Block, ReceivedDeletedBlockInfo> blocks = Maps.newHashMap();
+
+    /**
+     * Remove the given block from this IBR
+     * @return true if the block was removed; otherwise, return false.
+     */
+    ReceivedDeletedBlockInfo remove(Block block) {
+      return blocks.remove(block);
+    }
+
+    /** @return all the blocks removed from this IBR. */
+    ReceivedDeletedBlockInfo[] removeAll() {
+      final int size = blocks.size();
+      if (size == 0) {
+        return null;
+      }
+
+      final ReceivedDeletedBlockInfo[] rdbis = blocks.values().toArray(
+          new ReceivedDeletedBlockInfo[size]);
+      blocks.clear();
+      return rdbis;
+    }
+
+    /** Put the block to this IBR. */
+    void put(ReceivedDeletedBlockInfo rdbi) {
+      blocks.put(rdbi.getBlock(), rdbi);
+    }
+
+    /**
+     * Put the all blocks to this IBR unless the block already exists.
+     * @param rdbis list of blocks to add.
+     * @return the number of missing blocks added.
+     */
+    int putMissing(ReceivedDeletedBlockInfo[] rdbis) {
+      int count = 0;
+      for (ReceivedDeletedBlockInfo rdbi : rdbis) {
+        if (!blocks.containsKey(rdbi.getBlock())) {
+          put(rdbi);
+          count++;
+        }
+      }
+      return count;
+    }
+  }
+
+  /**
+   * Between block reports (which happen on the order of once an hour) the
+   * DN reports smaller incremental changes to its block list for each storage.
+   * This map contains the pending changes not yet to be reported to the NN.
+   */
+  private final Map<DatanodeStorage, PerStorageIBR> pendingIBRs
+      = Maps.newHashMap();
+
+  /**
+   * If this flag is set then an IBR will be sent immediately by the actor
+   * thread without waiting for the IBR timer to elapse.
+   */
+  private volatile boolean readyToSend = false;
+
+  boolean sendImmediately() {
+    return readyToSend;
+  }
+
+  private synchronized StorageReceivedDeletedBlocks[] generateIBRs() {
+    final List<StorageReceivedDeletedBlocks> reports
+        = new ArrayList<>(pendingIBRs.size());
+    for (Map.Entry<DatanodeStorage, PerStorageIBR> entry
+        : pendingIBRs.entrySet()) {
+      final PerStorageIBR perStorage = entry.getValue();
+
+        // Send newly-received and deleted blockids to namenode
+      final ReceivedDeletedBlockInfo[] rdbi = perStorage.removeAll();
+      if (rdbi != null) {
+        reports.add(new StorageReceivedDeletedBlocks(entry.getKey(), rdbi));
+      }
+    }
+    readyToSend = false;
+    return reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]);
+  }
+
+  private synchronized void putMissing(StorageReceivedDeletedBlocks[] reports) 
{
+    for (StorageReceivedDeletedBlocks r : reports) {
+      pendingIBRs.get(r.getStorage()).putMissing(r.getBlocks());
+    }
+    if (reports.length > 0) {
+      readyToSend = true;
+    }
+  }
+
+  /** Send IBRs to namenode. */
+  void sendIBRs(DatanodeProtocol namenode, DatanodeRegistration registration,
+      String bpid, DataNodeMetrics metrics) throws IOException {
+    // Generate a list of the pending reports for each storage under the lock
+    final StorageReceivedDeletedBlocks[] reports = generateIBRs();
+    if (reports.length == 0) {
+      // Nothing new to report.
+      return;
+    }
+
+    // Send incremental block reports to the Namenode outside the lock
+    boolean success = false;
+    final long startTime = monotonicNow();
+    try {
+      namenode.blockReceivedAndDeleted(registration, bpid, reports);
+      success = true;
+    } finally {
+      metrics.addIncrementalBlockReport(monotonicNow() - startTime);
+      if (!success) {
+        // If we didn't succeed in sending the report, put all of the
+        // blocks back onto our queue, but only in the case where we
+        // didn't put something newer in the meantime.
+        putMissing(reports);
+      }
+    }
+  }
+
+  /** @return the pending IBR for the given {@code storage} */
+  private PerStorageIBR getPerStorageIBR(DatanodeStorage storage) {
+    PerStorageIBR perStorage = pendingIBRs.get(storage);
+    if (perStorage == null) {
+      // This is the first time we are adding incremental BR state for
+      // this storage so create a new map. This is required once per
+      // storage, per service actor.
+      perStorage = new PerStorageIBR();
+      pendingIBRs.put(storage, perStorage);
+    }
+    return perStorage;
+  }
+
+  /**
+   * Add a block for notification to NameNode.
+   * If another entry exists for the same block it is removed.
+   */
+  @VisibleForTesting
+  synchronized void addRDBI(ReceivedDeletedBlockInfo rdbi,
+      DatanodeStorage storage) {
+    // Make sure another entry for the same block is first removed.
+    // There may only be one such entry.
+    for (PerStorageIBR perStorage : pendingIBRs.values()) {
+      if (perStorage.remove(rdbi.getBlock()) != null) {
+        break;
+      }
+    }
+    getPerStorageIBR(storage).put(rdbi);
+  }
+
+  synchronized void notifyNamenodeBlock(ReceivedDeletedBlockInfo rdbi,
+      DatanodeStorage storage) {
+    addRDBI(rdbi, storage);
+
+    final BlockStatus status = rdbi.getStatus();
+    if (status == BlockStatus.RECEIVING_BLOCK) {
+      // the report will be sent out in the next heartbeat.
+      readyToSend = true;
+    } else if (status == BlockStatus.RECEIVED_BLOCK) {
+      // the report is sent right away.
+      triggerIBR();
+    }
+  }
+
+  synchronized void triggerIBR() {
+    readyToSend = true;
+    notifyAll();
+  }
+
+  @VisibleForTesting
+  synchronized void triggerDeletionReportForTests() {
+    triggerIBR();
+
+    while (sendImmediately()) {
+      try {
+        wait(100);
+      } catch (InterruptedException e) {
+        return;
+      }
+    }
+  }
+
+  void clearIBRs() {
+    pendingIBRs.clear();
+  }
+
+  @VisibleForTesting
+  int getPendingIBRSize() {
+    return pendingIBRs.size();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eecc68d4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index ea5792e..89cee1e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -761,7 +761,7 @@ public class TestBPOfferService {
     List<BPServiceActor> bpServiceActors = bpos.getBPServiceActors();
     for (BPServiceActor bpServiceActor : bpServiceActors) {
       if (bpServiceActor.state == HAServiceState.STANDBY) {
-        return bpServiceActor.getPendingIBRSize();
+        return bpServiceActor.getIbrManager().getPendingIBRSize();
       }
     }
     return -1;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eecc68d4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java
index cd2c125..a3d47e9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import 
org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
 
@@ -54,7 +55,6 @@ public class TestIncrementalBlockReports {
   private static final long DUMMY_BLOCK_GENSTAMP = 1000;
 
   private MiniDFSCluster cluster = null;
-  private DistributedFileSystem fs;
   private Configuration conf;
   private NameNode singletonNn;
   private DataNode singletonDn;
@@ -66,7 +66,6 @@ public class TestIncrementalBlockReports {
   public void startCluster() throws IOException {
     conf = new HdfsConfiguration();
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DN_COUNT).build();
-    fs = cluster.getFileSystem();
     singletonNn = cluster.getNameNode();
     singletonDn = cluster.getDataNodes().get(0);
     bpos = singletonDn.getAllBpOs()[0];
@@ -84,7 +83,8 @@ public class TestIncrementalBlockReports {
   private void injectBlockReceived() {
     ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
         getDummyBlock(), BlockStatus.RECEIVED_BLOCK, null);
-    actor.notifyNamenodeBlock(rdbi, storageUuid, true);
+    DatanodeStorage s = singletonDn.getFSDataset().getStorage(storageUuid);
+    actor.getIbrManager().notifyNamenodeBlock(rdbi, s);
   }
 
   /**
@@ -93,7 +93,8 @@ public class TestIncrementalBlockReports {
   private void injectBlockDeleted() {
     ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
         getDummyBlock(), BlockStatus.DELETED_BLOCK, null);
-    actor.notifyNamenodeDeletedBlock(rdbi, storageUuid);
+    actor.getIbrManager().addRDBI(rdbi,
+        singletonDn.getFSDataset().getStorage(storageUuid));
   }
 
   /**
@@ -202,7 +203,7 @@ public class TestIncrementalBlockReports {
           any(StorageReceivedDeletedBlocks[].class));
 
       // Ensure that no more IBRs are pending.
-      assertFalse(actor.hasPendingIBR());
+      assertFalse(actor.getIbrManager().sendImmediately());
 
     } finally {
       cluster.shutdown();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eecc68d4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
index 3195d7d..4b50dec 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import 
org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@@ -94,7 +95,9 @@ public final class TestTriggerBlockReport {
         datanode.getAllBpOs()[0].getBPServiceActors().get(0);
     String storageUuid =
         datanode.getFSDataset().getVolumes().get(0).getStorageID();
-    actor.notifyNamenodeDeletedBlock(rdbi, storageUuid);
+    final DatanodeStorage storage = new DatanodeStorage(storageUuid);
+    actor.getIbrManager().addRDBI(rdbi, storage);
+
 
     // Manually trigger a block report.
     datanode.triggerBlockReport(


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to