This is an automated email from the ASF dual-hosted git repository.

kihwal pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new ab6b568  HDFS-15618. Improve datanode shutdown latency. Contributed by 
Ahmed Hussein.
ab6b568 is described below

commit ab6b5681e8baf43b5d6a50cc42c65c7a7a1760d7
Author: Kihwal Lee <kih...@apache.org>
AuthorDate: Wed Jun 16 11:38:30 2021 -0500

    HDFS-15618. Improve datanode shutdown latency. Contributed by Ahmed Hussein.
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  10 ++
 .../hadoop/hdfs/server/datanode/BlockScanner.java  |  33 ++++-
 .../hadoop/hdfs/server/datanode/DataNode.java      |   4 +-
 .../hadoop/hdfs/server/datanode/VolumeScanner.java |   3 +
 .../server/datanode/VolumeScannerCBInjector.java   |  51 +++++++
 .../src/main/resources/hdfs-default.xml            |   9 ++
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java     |  29 +++-
 .../hdfs/server/datanode/TestBlockScanner.java     | 148 +++++++++++++++++++++
 8 files changed, 280 insertions(+), 7 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 1e53a2e..e71ed95 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hdfs;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@@ -600,6 +602,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys 
{
   public static final int     DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 21 * 
24;  // 3 weeks.
   public static final String  DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND = 
"dfs.block.scanner.volume.bytes.per.second";
   public static final long    
DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT = 1048576L;
+  /**
+   * The amount of time in milliseconds that the BlockScanner times out waiting
+   * for the VolumeScanner thread to join during a shutdown call.
+   */
+  public static final String  DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY =
+      "dfs.block.scanner.volume.join.timeout.ms";
+  public static final long DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT =
+      TimeUnit.SECONDS.toMillis(5);
   public static final String  DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = 
"dfs.datanode.transferTo.allowed";
   public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
   public static final String  DFS_HEARTBEAT_INTERVAL_KEY = 
"dfs.heartbeat.interval";
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
index 3d97022..072f69d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
 
@@ -66,6 +68,12 @@ public class BlockScanner {
    */
   private Conf conf;
 
+  /**
+   * Timeout duration in milliseconds waiting for {@link VolumeScanner} to stop
+   * inside {@link #removeAllVolumeScanners}.
+   */
+  private long joinVolumeScannersTimeOutMs;
+
   @VisibleForTesting
   void setConf(Conf conf) {
     this.conf = conf;
@@ -179,6 +187,9 @@ public class BlockScanner {
 
   public BlockScanner(DataNode datanode, Configuration conf) {
     this.datanode = datanode;
+    setJoinVolumeScannersTimeOutMs(
+        conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
+            DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT));
     this.conf = new Conf(conf);
     if (isEnabled()) {
       LOG.info("Initialized block scanner with targetBytesPerSec {}",
@@ -198,6 +209,13 @@ public class BlockScanner {
     return (conf.scanPeriodMs > 0) && (conf.targetBytesPerSec > 0);
   }
 
+  /**
+   * Returns true if there is any scanner thread registered.
+   */
+  public synchronized boolean hasAnyRegisteredScanner() {
+    return !scanners.isEmpty();
+  }
+
  /**
   * Set up a scanner for the given block pool and volume.
   *
@@ -262,7 +280,10 @@ public class BlockScanner {
   /**
    * Stops and removes all volume scanners.<p/>
    *
-   * This function will block until all the volume scanners have stopped.
+   * This function is called on shutdown. It will return even if some of
+   * the scanners don't terminate in time. Since the scanners are daemon
+   * threads and do not alter the block content, it is safe to ignore
+   * such conditions on shutdown.
    */
   public synchronized void removeAllVolumeScanners() {
     for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
@@ -270,7 +291,7 @@ public class BlockScanner {
     }
     for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
       Uninterruptibles.joinUninterruptibly(entry.getValue(),
-          5, TimeUnit.MINUTES);
+          getJoinVolumeScannersTimeOutMs(), TimeUnit.MILLISECONDS);
     }
     scanners.clear();
   }
@@ -346,6 +367,14 @@ public class BlockScanner {
     scanner.markSuspectBlock(block);
   }
 
+  public long getJoinVolumeScannersTimeOutMs() {
+    return joinVolumeScannersTimeOutMs;
+  }
+
+  public void setJoinVolumeScannersTimeOutMs(long joinScannersTimeOutMs) {
+    this.joinVolumeScannersTimeOutMs = joinScannersTimeOutMs;
+  }
+
   @InterfaceAudience.Private
   public static class Servlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index b60ed53..3788cd1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1579,7 +1579,9 @@ public class DataNode extends ReconfigurableBase
       // a block pool id
       String bpId = bpos.getBlockPoolId();
 
-      blockScanner.disableBlockPoolId(bpId);
+      if (blockScanner.hasAnyRegisteredScanner()) {
+        blockScanner.disableBlockPoolId(bpId);
+      }
 
       if (data != null) {
         data.shutdownBlockPool(bpId);
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
index b530536..ab8a70d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
@@ -641,12 +641,14 @@ public class VolumeScanner extends Thread {
         LOG.error("{} exiting because of exception ", this, e);
       }
       LOG.info("{} exiting.", this);
+      VolumeScannerCBInjector.get().preSavingBlockIteratorTask(this);
       // Save the current position of all block iterators and close them.
       for (BlockIterator iter : blockIters) {
         saveBlockIterator(iter);
         IOUtils.cleanup(null, iter);
       }
     } finally {
+      VolumeScannerCBInjector.get().terminationCallBack(this);
       // When the VolumeScanner exits, release the reference we were holding
       // on the volume.  This will allow the volume to be removed later.
       IOUtils.cleanup(null, ref);
@@ -666,6 +668,7 @@ public class VolumeScanner extends Thread {
     stopping = true;
     notify();
     this.interrupt();
+    VolumeScannerCBInjector.get().shutdownCallBack(this);
   }
 
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScannerCBInjector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScannerCBInjector.java
new file mode 100644
index 0000000..5798bd1
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScannerCBInjector.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Used for injecting call backs in {@link VolumeScanner}
+ * and {@link BlockScanner} tests.
+ * Calls into this are a no-op in production code.
+ */
+@VisibleForTesting
+@InterfaceAudience.Private
+public class VolumeScannerCBInjector {
+  private static VolumeScannerCBInjector instance =
+      new VolumeScannerCBInjector();
+
+  public static VolumeScannerCBInjector get() {
+    return instance;
+  }
+
+  public static void set(VolumeScannerCBInjector injector) {
+    instance = injector;
+  }
+
+  public void preSavingBlockIteratorTask(final VolumeScanner volumeScanner) {
+  }
+
+  public void shutdownCallBack(final VolumeScanner volumeScanner) {
+  }
+
+  public void terminationCallBack(final VolumeScanner volumeScanner) {
+  }
+}
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index ad9104e..c74dddb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1409,6 +1409,15 @@
 </property>
 
 <property>
+  <name>dfs.block.scanner.volume.join.timeout.ms</name>
+  <value>5000</value>
+  <description>
+    The amount of time in milliseconds that the BlockScanner times out waiting
+    for the VolumeScanner thread to join during a shutdown call.
+  </description>
+</property>
+
+<property>
   <name>dfs.datanode.readahead.bytes</name>
   <value>4194304</value>
   <description>
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 067b47c..d313e0e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -21,6 +21,7 @@ import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
@@ -62,8 +63,10 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.base.Supplier;
@@ -154,6 +157,13 @@ public class MiniDFSCluster implements AutoCloseable {
       = DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + ".testing";
   public static final String  DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY
       = DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY + ".testing";
+  /**
+   * For the Junit tests, this is the default value of the The amount of time
+   * in milliseconds that the BlockScanner times out waiting for the
+   * thread to join during a shutdown call.
+   */
+  public static final long DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC =
+      TimeUnit.SECONDS.toMillis(30);
 
   // Changing this default may break some tests that assume it is 2.
   private static final int DEFAULT_STORAGES_PER_DATANODE = 2;
@@ -200,8 +210,7 @@ public class MiniDFSCluster implements AutoCloseable {
 
     public Builder(Configuration conf) {
       this.conf = conf;
-      this.storagesPerDatanode =
-          
FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
+      initDefaultConfigurations();
       if (null == conf.get(HDFS_MINIDFS_BASEDIR)) {
         conf.set(HDFS_MINIDFS_BASEDIR,
             new File(getBaseDirectory()).getAbsolutePath());
@@ -210,8 +219,7 @@ public class MiniDFSCluster implements AutoCloseable {
 
     public Builder(Configuration conf, File basedir) {
       this.conf = conf;
-      this.storagesPerDatanode =
-          
FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
+      initDefaultConfigurations();
       if (null == basedir) {
         throw new IllegalArgumentException(
             "MiniDFSCluster base directory cannot be null");
@@ -475,6 +483,19 @@ public class MiniDFSCluster implements AutoCloseable {
     public MiniDFSCluster build() throws IOException {
       return new MiniDFSCluster(this);
     }
+
+    /**
+     * Initializes default values for the cluster.
+     */
+    private void initDefaultConfigurations() {
+      long defaultScannerVolumeTimeOut =
+          conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
+              DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC);
+      conf.setLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
+          defaultScannerVolumeTimeOut);
+      this.storagesPerDatanode =
+          
FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
+    }
   }
   
   /**
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
index 49e3226..fe65371 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
 import static 
org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS;
@@ -32,9 +33,11 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import com.google.common.base.Supplier;
@@ -93,9 +96,19 @@ public class TestBlockScanner {
     TestContext(Configuration conf, int numNameServices) throws Exception {
       this.numNameServices = numNameServices;
       File basedir = new File(GenericTestUtils.getRandomizedTempPath());
+      long volumeScannerTimeOutFromConf =
+          conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, -1);
+      long expectedVScannerTimeOut =
+          volumeScannerTimeOutFromConf == -1
+              ? MiniDFSCluster.DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC
+              : volumeScannerTimeOutFromConf;
       MiniDFSCluster.Builder bld = new MiniDFSCluster.Builder(conf, basedir).
           numDataNodes(1).
           storagesPerDatanode(1);
+      // verify that the builder was initialized to get the default
+      // configuration designated for Junit tests.
+      assertEquals(expectedVScannerTimeOut,
+            conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, -1));
       if (numNameServices > 1) {
         bld.nnTopology(MiniDFSNNTopology.
               simpleFederatedTopology(numNameServices));
@@ -973,4 +986,139 @@ public class TestBlockScanner {
       info.blocksScanned = 0;
     }
   }
+
+  /**
+   * Test a DN does not wait for the VolumeScanners to finish before shutting
+   * down.
+   *
+   * @throws Exception
+   */
+  @Test(timeout=120000)
+  public void testFastDatanodeShutdown() throws Exception {
+    // set the joinTimeOut to a value smaller than the completion time of the
+    // VolumeScanner.
+    testDatanodeShutDown(50L, 1000L, true);
+  }
+
+  /**
+   * Test a DN waits for the VolumeScanners to finish before shutting down.
+   *
+   * @throws Exception
+   */
+  @Test(timeout=120000)
+  public void testSlowDatanodeShutdown() throws Exception {
+    // Set the joinTimeOut to a value larger than the completion time of the
+    // volume scanner
+    testDatanodeShutDown(TimeUnit.MINUTES.toMillis(5), 1000L,
+        false);
+  }
+
+  private void testDatanodeShutDown(final long joinTimeOutMS,
+      final long delayMS, boolean isFastShutdown) throws Exception {
+    VolumeScannerCBInjector prevVolumeScannerCBInject =
+        VolumeScannerCBInjector.get();
+    try {
+      DelayVolumeScannerResponseToInterrupt injectDelay =
+          new DelayVolumeScannerResponseToInterrupt(delayMS);
+      VolumeScannerCBInjector.set(injectDelay);
+      Configuration conf = new Configuration();
+      conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
+      conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+          TestScanResultHandler.class.getName());
+      conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
+      conf.setLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
+          joinTimeOutMS);
+      final TestContext ctx = new TestContext(conf, 1);
+      final int numExpectedBlocks = 10;
+      ctx.createFiles(0, numExpectedBlocks, 1);
+      final TestScanResultHandler.Info info =
+          TestScanResultHandler.getInfo(ctx.volumes.get(0));
+      synchronized (info) {
+        info.sem = new Semaphore(5);
+        info.shouldRun = true;
+        info.notify();
+      }
+      // make sure that the scanners are doing progress
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          synchronized (info) {
+            return info.blocksScanned >= 1;
+          }
+        }
+      }, 3, 30000);
+      // mark the time where the
+      long startShutdownTime = Time.monotonicNow();
+      ctx.datanode.shutdown();
+      long endShutdownTime = Time.monotonicNow();
+      long totalTimeShutdown = endShutdownTime - startShutdownTime;
+
+      if (isFastShutdown) {
+        assertTrue("total shutdown time of DN must be smaller than "
+                + "VolumeScanner Response time: " + totalTimeShutdown,
+            totalTimeShutdown < delayMS
+                && totalTimeShutdown >= joinTimeOutMS);
+        // wait for scanners to terminate before we move to the next test.
+        injectDelay.waitForScanners();
+        return;
+      }
+      assertTrue("total shutdown time of DN must be larger than " +
+              "VolumeScanner Response time: " + totalTimeShutdown,
+          totalTimeShutdown >= delayMS
+              && totalTimeShutdown < joinTimeOutMS);
+    } finally {
+      // restore the VolumeScanner callback injector.
+      VolumeScannerCBInjector.set(prevVolumeScannerCBInject);
+    }
+  }
+
+  private static class DelayVolumeScannerResponseToInterrupt extends
+      VolumeScannerCBInjector {
+    final private long delayAmountNS;
+    final private Map<VolumeScanner, Boolean> scannersToShutDown;
+
+    DelayVolumeScannerResponseToInterrupt(long delayMS) {
+      delayAmountNS =
+          TimeUnit.NANOSECONDS.convert(delayMS, TimeUnit.MILLISECONDS);
+      scannersToShutDown = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public void preSavingBlockIteratorTask(VolumeScanner volumeScanner) {
+      long remainingTimeNS = delayAmountNS;
+      // busy delay without sleep().
+      long startTime = Time.monotonicNowNanos();
+      long endTime = startTime + remainingTimeNS;
+      long currTime, waitTime = 0;
+      while ((currTime = Time.monotonicNowNanos()) < endTime) {
+        // empty loop. No need to sleep because the thread could be in an
+        // interrupt mode.
+        waitTime = currTime - startTime;
+      }
+      LOG.info("VolumeScanner {} finished delayed Task after {}",
+          volumeScanner.toString(),
+          TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.MILLISECONDS));
+    }
+
+    @Override
+    public void shutdownCallBack(VolumeScanner volumeScanner) {
+      scannersToShutDown.put(volumeScanner, true);
+    }
+
+    @Override
+    public void terminationCallBack(VolumeScanner volumeScanner) {
+      scannersToShutDown.remove(volumeScanner);
+    }
+
+    public void waitForScanners() throws TimeoutException,
+        InterruptedException {
+      GenericTestUtils.waitFor(
+          new Supplier<Boolean>() {
+            @Override
+            public Boolean get() {
+              return scannersToShutDown.isEmpty();
+            }
+          }, 10, 120000);
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to