This is an automated email from the ASF dual-hosted git repository. kihwal pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push: new ab6b568 HDFS-15618. Improve datanode shutdown latency. Contributed by Ahmed Hussein. ab6b568 is described below commit ab6b5681e8baf43b5d6a50cc42c65c7a7a1760d7 Author: Kihwal Lee <kih...@apache.org> AuthorDate: Wed Jun 16 11:38:30 2021 -0500 HDFS-15618. Improve datanode shutdown latency. Contributed by Ahmed Hussein. --- .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 10 ++ .../hadoop/hdfs/server/datanode/BlockScanner.java | 33 ++++- .../hadoop/hdfs/server/datanode/DataNode.java | 4 +- .../hadoop/hdfs/server/datanode/VolumeScanner.java | 3 + .../server/datanode/VolumeScannerCBInjector.java | 51 +++++++ .../src/main/resources/hdfs-default.xml | 9 ++ .../org/apache/hadoop/hdfs/MiniDFSCluster.java | 29 +++- .../hdfs/server/datanode/TestBlockScanner.java | 148 +++++++++++++++++++++ 8 files changed, 280 insertions(+), 7 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 1e53a2e..e71ed95 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -600,6 +602,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 21 * 24; // 3 weeks. public static final String DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND = "dfs.block.scanner.volume.bytes.per.second"; public static final long DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT = 1048576L; + /** + * The amount of time in milliseconds that the BlockScanner times out waiting + * for the VolumeScanner thread to join during a shutdown call. + */ + public static final String DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY = + "dfs.block.scanner.volume.join.timeout.ms"; + public static final long DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT = + TimeUnit.SECONDS.toMillis(5); public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed"; public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true; public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java index 3d97022..072f69d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT; @@ -66,6 +68,12 @@ public class BlockScanner { */ private Conf conf; + /** + * Timeout duration in milliseconds waiting for {@link VolumeScanner} to stop + * inside {@link #removeAllVolumeScanners}. + */ + private long joinVolumeScannersTimeOutMs; + @VisibleForTesting void setConf(Conf conf) { this.conf = conf; @@ -179,6 +187,9 @@ public class BlockScanner { public BlockScanner(DataNode datanode, Configuration conf) { this.datanode = datanode; + setJoinVolumeScannersTimeOutMs( + conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, + DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT)); this.conf = new Conf(conf); if (isEnabled()) { LOG.info("Initialized block scanner with targetBytesPerSec {}", @@ -198,6 +209,13 @@ public class BlockScanner { return (conf.scanPeriodMs > 0) && (conf.targetBytesPerSec > 0); } + /** + * Returns true if there is any scanner thread registered. + */ + public synchronized boolean hasAnyRegisteredScanner() { + return !scanners.isEmpty(); + } + /** * Set up a scanner for the given block pool and volume. * @@ -262,7 +280,10 @@ public class BlockScanner { /** * Stops and removes all volume scanners.<p/> * - * This function will block until all the volume scanners have stopped. + * This function is called on shutdown. It will return even if some of + * the scanners don't terminate in time. Since the scanners are daemon + * threads and do not alter the block content, it is safe to ignore + * such conditions on shutdown. */ public synchronized void removeAllVolumeScanners() { for (Entry<String, VolumeScanner> entry : scanners.entrySet()) { @@ -270,7 +291,7 @@ public class BlockScanner { } for (Entry<String, VolumeScanner> entry : scanners.entrySet()) { Uninterruptibles.joinUninterruptibly(entry.getValue(), - 5, TimeUnit.MINUTES); + getJoinVolumeScannersTimeOutMs(), TimeUnit.MILLISECONDS); } scanners.clear(); } @@ -346,6 +367,14 @@ public class BlockScanner { scanner.markSuspectBlock(block); } + public long getJoinVolumeScannersTimeOutMs() { + return joinVolumeScannersTimeOutMs; + } + + public void setJoinVolumeScannersTimeOutMs(long joinScannersTimeOutMs) { + this.joinVolumeScannersTimeOutMs = joinScannersTimeOutMs; + } + @InterfaceAudience.Private public static class Servlet extends HttpServlet { private static final long serialVersionUID = 1L; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index b60ed53..3788cd1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1579,7 +1579,9 @@ public class DataNode extends ReconfigurableBase // a block pool id String bpId = bpos.getBlockPoolId(); - blockScanner.disableBlockPoolId(bpId); + if (blockScanner.hasAnyRegisteredScanner()) { + blockScanner.disableBlockPoolId(bpId); + } if (data != null) { data.shutdownBlockPool(bpId); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java index b530536..ab8a70d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java @@ -641,12 +641,14 @@ public class VolumeScanner extends Thread { LOG.error("{} exiting because of exception ", this, e); } LOG.info("{} exiting.", this); + VolumeScannerCBInjector.get().preSavingBlockIteratorTask(this); // Save the current position of all block iterators and close them. for (BlockIterator iter : blockIters) { saveBlockIterator(iter); IOUtils.cleanup(null, iter); } } finally { + VolumeScannerCBInjector.get().terminationCallBack(this); // When the VolumeScanner exits, release the reference we were holding // on the volume. This will allow the volume to be removed later. IOUtils.cleanup(null, ref); @@ -666,6 +668,7 @@ public class VolumeScanner extends Thread { stopping = true; notify(); this.interrupt(); + VolumeScannerCBInjector.get().shutdownCallBack(this); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScannerCBInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScannerCBInjector.java new file mode 100644 index 0000000..5798bd1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScannerCBInjector.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Used for injecting call backs in {@link VolumeScanner} + * and {@link BlockScanner} tests. + * Calls into this are a no-op in production code. + */ +@VisibleForTesting +@InterfaceAudience.Private +public class VolumeScannerCBInjector { + private static VolumeScannerCBInjector instance = + new VolumeScannerCBInjector(); + + public static VolumeScannerCBInjector get() { + return instance; + } + + public static void set(VolumeScannerCBInjector injector) { + instance = injector; + } + + public void preSavingBlockIteratorTask(final VolumeScanner volumeScanner) { + } + + public void shutdownCallBack(final VolumeScanner volumeScanner) { + } + + public void terminationCallBack(final VolumeScanner volumeScanner) { + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index ad9104e..c74dddb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1409,6 +1409,15 @@ </property> <property> + <name>dfs.block.scanner.volume.join.timeout.ms</name> + <value>5000</value> + <description> + The amount of time in milliseconds that the BlockScanner times out waiting + for the VolumeScanner thread to join during a shutdown call. + </description> +</property> + +<property> <name>dfs.datanode.readahead.bytes</name> <value>4194304</value> <description> diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 067b47c..d313e0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; @@ -62,8 +63,10 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; + import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import com.google.common.base.Supplier; @@ -154,6 +157,13 @@ public class MiniDFSCluster implements AutoCloseable { = DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + ".testing"; public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY = DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY + ".testing"; + /** + * For the Junit tests, this is the default value of the The amount of time + * in milliseconds that the BlockScanner times out waiting for the + * thread to join during a shutdown call. + */ + public static final long DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC = + TimeUnit.SECONDS.toMillis(30); // Changing this default may break some tests that assume it is 2. private static final int DEFAULT_STORAGES_PER_DATANODE = 2; @@ -200,8 +210,7 @@ public class MiniDFSCluster implements AutoCloseable { public Builder(Configuration conf) { this.conf = conf; - this.storagesPerDatanode = - FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs(); + initDefaultConfigurations(); if (null == conf.get(HDFS_MINIDFS_BASEDIR)) { conf.set(HDFS_MINIDFS_BASEDIR, new File(getBaseDirectory()).getAbsolutePath()); @@ -210,8 +219,7 @@ public class MiniDFSCluster implements AutoCloseable { public Builder(Configuration conf, File basedir) { this.conf = conf; - this.storagesPerDatanode = - FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs(); + initDefaultConfigurations(); if (null == basedir) { throw new IllegalArgumentException( "MiniDFSCluster base directory cannot be null"); @@ -475,6 +483,19 @@ public class MiniDFSCluster implements AutoCloseable { public MiniDFSCluster build() throws IOException { return new MiniDFSCluster(this); } + + /** + * Initializes default values for the cluster. + */ + private void initDefaultConfigurations() { + long defaultScannerVolumeTimeOut = + conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, + DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC); + conf.setLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, + defaultScannerVolumeTimeOut); + this.storagesPerDatanode = + FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs(); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java index 49e3226..fe65371 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND; import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS; @@ -32,9 +33,11 @@ import java.io.IOException; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.google.common.base.Supplier; @@ -93,9 +96,19 @@ public class TestBlockScanner { TestContext(Configuration conf, int numNameServices) throws Exception { this.numNameServices = numNameServices; File basedir = new File(GenericTestUtils.getRandomizedTempPath()); + long volumeScannerTimeOutFromConf = + conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, -1); + long expectedVScannerTimeOut = + volumeScannerTimeOutFromConf == -1 + ? MiniDFSCluster.DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC + : volumeScannerTimeOutFromConf; MiniDFSCluster.Builder bld = new MiniDFSCluster.Builder(conf, basedir). numDataNodes(1). storagesPerDatanode(1); + // verify that the builder was initialized to get the default + // configuration designated for Junit tests. + assertEquals(expectedVScannerTimeOut, + conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, -1)); if (numNameServices > 1) { bld.nnTopology(MiniDFSNNTopology. simpleFederatedTopology(numNameServices)); @@ -973,4 +986,139 @@ public class TestBlockScanner { info.blocksScanned = 0; } } + + /** + * Test a DN does not wait for the VolumeScanners to finish before shutting + * down. + * + * @throws Exception + */ + @Test(timeout=120000) + public void testFastDatanodeShutdown() throws Exception { + // set the joinTimeOut to a value smaller than the completion time of the + // VolumeScanner. + testDatanodeShutDown(50L, 1000L, true); + } + + /** + * Test a DN waits for the VolumeScanners to finish before shutting down. + * + * @throws Exception + */ + @Test(timeout=120000) + public void testSlowDatanodeShutdown() throws Exception { + // Set the joinTimeOut to a value larger than the completion time of the + // volume scanner + testDatanodeShutDown(TimeUnit.MINUTES.toMillis(5), 1000L, + false); + } + + private void testDatanodeShutDown(final long joinTimeOutMS, + final long delayMS, boolean isFastShutdown) throws Exception { + VolumeScannerCBInjector prevVolumeScannerCBInject = + VolumeScannerCBInjector.get(); + try { + DelayVolumeScannerResponseToInterrupt injectDelay = + new DelayVolumeScannerResponseToInterrupt(delayMS); + VolumeScannerCBInjector.set(injectDelay); + Configuration conf = new Configuration(); + conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L); + conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER, + TestScanResultHandler.class.getName()); + conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L); + conf.setLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, + joinTimeOutMS); + final TestContext ctx = new TestContext(conf, 1); + final int numExpectedBlocks = 10; + ctx.createFiles(0, numExpectedBlocks, 1); + final TestScanResultHandler.Info info = + TestScanResultHandler.getInfo(ctx.volumes.get(0)); + synchronized (info) { + info.sem = new Semaphore(5); + info.shouldRun = true; + info.notify(); + } + // make sure that the scanners are doing progress + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + synchronized (info) { + return info.blocksScanned >= 1; + } + } + }, 3, 30000); + // mark the time where the + long startShutdownTime = Time.monotonicNow(); + ctx.datanode.shutdown(); + long endShutdownTime = Time.monotonicNow(); + long totalTimeShutdown = endShutdownTime - startShutdownTime; + + if (isFastShutdown) { + assertTrue("total shutdown time of DN must be smaller than " + + "VolumeScanner Response time: " + totalTimeShutdown, + totalTimeShutdown < delayMS + && totalTimeShutdown >= joinTimeOutMS); + // wait for scanners to terminate before we move to the next test. + injectDelay.waitForScanners(); + return; + } + assertTrue("total shutdown time of DN must be larger than " + + "VolumeScanner Response time: " + totalTimeShutdown, + totalTimeShutdown >= delayMS + && totalTimeShutdown < joinTimeOutMS); + } finally { + // restore the VolumeScanner callback injector. + VolumeScannerCBInjector.set(prevVolumeScannerCBInject); + } + } + + private static class DelayVolumeScannerResponseToInterrupt extends + VolumeScannerCBInjector { + final private long delayAmountNS; + final private Map<VolumeScanner, Boolean> scannersToShutDown; + + DelayVolumeScannerResponseToInterrupt(long delayMS) { + delayAmountNS = + TimeUnit.NANOSECONDS.convert(delayMS, TimeUnit.MILLISECONDS); + scannersToShutDown = new ConcurrentHashMap<>(); + } + + @Override + public void preSavingBlockIteratorTask(VolumeScanner volumeScanner) { + long remainingTimeNS = delayAmountNS; + // busy delay without sleep(). + long startTime = Time.monotonicNowNanos(); + long endTime = startTime + remainingTimeNS; + long currTime, waitTime = 0; + while ((currTime = Time.monotonicNowNanos()) < endTime) { + // empty loop. No need to sleep because the thread could be in an + // interrupt mode. + waitTime = currTime - startTime; + } + LOG.info("VolumeScanner {} finished delayed Task after {}", + volumeScanner.toString(), + TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.MILLISECONDS)); + } + + @Override + public void shutdownCallBack(VolumeScanner volumeScanner) { + scannersToShutDown.put(volumeScanner, true); + } + + @Override + public void terminationCallBack(VolumeScanner volumeScanner) { + scannersToShutDown.remove(volumeScanner); + } + + public void waitForScanners() throws TimeoutException, + InterruptedException { + GenericTestUtils.waitFor( + new Supplier<Boolean>() { + @Override + public Boolean get() { + return scannersToShutDown.isEmpty(); + } + }, 10, 120000); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org