This is an automated email from the ASF dual-hosted git repository. reidchan pushed a commit to branch branch-1.4 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1.4 by this push: new fdccda3 HBASE-22880 Move the DirScanPool out and do not use static field fdccda3 is described below commit fdccda3197bd5c9dcbc38e8b49ae19cc8c5d9b13 Author: Dean_19 <35092554+zha...@users.noreply.github.com> AuthorDate: Sat Aug 24 13:13:32 2019 +0800 HBASE-22880 Move the DirScanPool out and do not use static field Signed-off-by Reid Chan <reidc...@apache.org> --- .../org/apache/hadoop/hbase/master/HMaster.java | 31 ++--- .../hadoop/hbase/master/cleaner/CleanerChore.java | 146 ++++----------------- .../hadoop/hbase/master/cleaner/DirScanPool.java | 111 ++++++++++++++++ .../hadoop/hbase/master/cleaner/HFileCleaner.java | 8 +- .../hadoop/hbase/master/cleaner/LogCleaner.java | 10 +- .../hadoop/hbase/backup/TestHFileArchiving.java | 33 +++-- .../example/TestZooKeeperTableArchiveClient.java | 17 +-- .../hbase/master/cleaner/TestCleanerChore.java | 49 ++++--- .../hbase/master/cleaner/TestHFileCleaner.java | 21 +-- .../hbase/master/cleaner/TestHFileLinkCleaner.java | 24 +++- .../hbase/master/cleaner/TestLogsCleaner.java | 8 +- 11 files changed, 252 insertions(+), 206 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0bc6c91..7a49623 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -98,7 +98,7 @@ import org.apache.hadoop.hbase.master.balancer.BalancerChore; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; -import org.apache.hadoop.hbase.master.cleaner.CleanerChore; +import org.apache.hadoop.hbase.master.cleaner.DirScanPool; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore; @@ -330,6 +330,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { private PeriodicDoMetrics periodicDoMetricsChore = null; CatalogJanitor catalogJanitorChore; + private DirScanPool cleanerPool; private ReplicationZKLockCleanerChore replicationZKLockCleanerChore; private ReplicationZKNodeCleanerChore replicationZKNodeCleanerChore; private LogCleaner logCleaner; @@ -895,6 +896,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { (System.currentTimeMillis() - masterActiveTime) / 1000.0f)); this.masterFinishedInitializationTime = System.currentTimeMillis(); configurationManager.registerObserver(this.balancer); + configurationManager.registerObserver(this.cleanerPool); configurationManager.registerObserver(this.hfileCleaner); configurationManager.registerObserver(this.logCleaner); @@ -1234,22 +1236,19 @@ public class HMaster extends HRegionServer implements MasterServices, Server { this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1); startProcedureExecutor(); - // Initial cleaner chore - CleanerChore.initChorePool(conf); - // Start log cleaner thread - int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000); - this.logCleaner = - new LogCleaner(cleanerInterval, - this, conf, getMasterFileSystem().getOldLogDir().getFileSystem(conf), - getMasterFileSystem().getOldLogDir()); - getChoreService().scheduleChore(logCleaner); - + // Create cleaner thread pool + cleanerPool = new DirScanPool(conf); + // Start log cleaner thread + int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 * 1000); + this.logCleaner = new LogCleaner(cleanerInterval, this, conf, + getMasterFileSystem().getOldLogDir().getFileSystem(conf), + getMasterFileSystem().getOldLogDir(), cleanerPool); //start the hfile archive cleaner thread Path archiveDir = HFileArchiveUtil.getArchivePath(conf); Map<String, Object> params = new HashMap<String, Object>(); params.put(MASTER, this); - this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem() - .getFileSystem(), archiveDir, params); + this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, + getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params); getChoreService().scheduleChore(hfileCleaner); serviceStarted = true; if (LOG.isTraceEnabled()) { @@ -1291,8 +1290,10 @@ public class HMaster extends HRegionServer implements MasterServices, Server { } stopChores(); super.stopServiceThreads(); - CleanerChore.shutDownChorePool(); - + if (cleanerPool != null) { + cleanerPool.shutdownNow(); + cleanerPool = null; + } // Wait for all the remaining region servers to report in IFF we were // running a cluster shutdown AND we were NOT aborting. if (!isAborted() && this.serverManager != null && diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index a360bd6..db0e897 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -23,8 +23,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; import java.util.concurrent.atomic.AtomicBoolean; @@ -38,7 +36,6 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FileStatusFilter; import org.apache.hadoop.ipc.RemoteException; @@ -53,11 +50,8 @@ import com.google.common.collect.Lists; * Abstract Cleaner that uses a chain of delegates to clean a directory of files * @param <T> Cleaner delegate class that is dynamically loaded from configuration */ -@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", - justification="Static pool will be only updated once.") @InterfaceAudience.Private -public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore - implements ConfigurationObserver { +public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore { private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName()); private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors(); @@ -69,85 +63,8 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu * while latter will use only 1 thread for chore to scan dir. */ public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size"; - private static final String DEFAULT_CHORE_POOL_SIZE = "0.25"; - - private static class DirScanPool { - int size; - ForkJoinPool pool; - int cleanerLatch; - AtomicBoolean reconfigNotification; - - DirScanPool(Configuration conf) { - String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE); - size = calculatePoolSize(poolSize); - // poolSize may be 0 or 0.0 from a careless configuration, - // double check to make sure. - size = size == 0 ? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE) : size; - pool = new ForkJoinPool(size); - LOG.info("Cleaner pool size is " + size); - reconfigNotification = new AtomicBoolean(false); - cleanerLatch = 0; - } - - /** - * Checks if pool can be updated. If so, mark for update later. - * @param conf configuration - */ - synchronized void markUpdate(Configuration conf) { - int newSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE)); - if (newSize == size) { - LOG.trace("Size from configuration is same as previous=" + newSize + - " no need to update."); - return; - } - size = newSize; - // Chore is working, update it later. - reconfigNotification.set(true); - } - - /** - * Update pool with new size. - */ - synchronized void updatePool(long timeout) { - long stopWaitTime = System.currentTimeMillis() + timeout; - while (cleanerLatch != 0 && timeout > 0) { - try { - wait(timeout); - timeout = stopWaitTime - System.currentTimeMillis(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - break; - } - } - shutDownNow(); - LOG.info("Update chore's pool size from " + pool.getParallelism() + " to " + size); - pool = new ForkJoinPool(size); - } - - synchronized void latchCountUp() { - cleanerLatch++; - } - - synchronized void latchCountDown() { - cleanerLatch--; - notifyAll(); - } - - @SuppressWarnings("FutureReturnValueIgnored") - synchronized void submit(ForkJoinTask task) { - pool.submit(task); - } - - synchronized void shutDownNow() { - if (pool == null || pool.isShutdown()) { - return; - } - pool.shutdownNow(); - } - } - // It may be waste resources for each cleaner chore own its pool, - // so let's make pool for all cleaner chores. - private static volatile DirScanPool POOL; + static final String DEFAULT_CHORE_POOL_SIZE = "0.25"; + private final DirScanPool pool; protected final FileSystem fs; private final Path oldFileDir; @@ -156,22 +73,9 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu protected Map<String, Object> params; private AtomicBoolean enabled = new AtomicBoolean(true); - public static void initChorePool(Configuration conf) { - if (POOL == null) { - POOL = new DirScanPool(conf); - } - } - - public static void shutDownChorePool() { - if (POOL != null) { - POOL.shutDownNow(); - POOL = null; - } - } - public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, - FileSystem fs, Path oldFileDir, String confKey) { - this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null); + FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) { + this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null); } /** @@ -182,14 +86,16 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu * @param fs handle to the FS * @param oldFileDir the path to the archived files * @param confKey configuration key for the classes to instantiate + * @param pool the thread pool used to scan directories * @param params members could be used in cleaner */ - public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, - FileSystem fs, Path oldFileDir, String confKey, Map<String, Object> params) { + public CleanerChore(String name, final int sleepPeriod, final Stoppable s, + Configuration conf, FileSystem fs, Path oldFileDir, String confKey, + DirScanPool pool, Map<String, Object> params) { super(name, s, sleepPeriod); - Preconditions.checkNotNull(POOL, "Chore's pool isn't initialized, please call" - + "CleanerChore.initChorePool(Configuration) before new a cleaner chore."); + Preconditions.checkNotNull(pool, "Chore's pool can not be null"); + this.pool = pool; this.fs = fs; this.oldFileDir = oldFileDir; this.conf = conf; @@ -253,11 +159,6 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu } } - @Override - public void onConfigurationChange(Configuration conf) { - POOL.markUpdate(conf); - } - /** * A utility method to create new instances of LogCleanerDelegate based on the class name of the * LogCleanerDelegate. @@ -285,7 +186,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu protected void chore() { if (getEnabled()) { try { - POOL.latchCountUp(); + pool.latchCountUp(); if (runCleaner()) { if (LOG.isTraceEnabled()) { LOG.trace("Cleaned all WALs under " + oldFileDir); @@ -296,15 +197,11 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu } } } finally { - POOL.latchCountDown(); - } - // After each cleaner chore, checks if received reconfigure notification while cleaning. - // First in cleaner turns off notification, to avoid another cleaner updating pool again. - if (POOL.reconfigNotification.compareAndSet(true, false)) { - // This cleaner is waiting for other cleaners finishing their jobs. - // To avoid missing next chore, only wait 0.8 * period, then shutdown. - POOL.updatePool((long) (0.8 * getTimeUnit().toMillis(getPeriod()))); + pool.latchCountDown(); } + // This cleaner is waiting for other cleaners finishing their jobs. + // To avoid missing next chore, only wait 0.8 * period, then shutdown. + pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod()))); } else { LOG.trace("Cleaner chore disabled! Not cleaning."); } @@ -312,7 +209,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu public Boolean runCleaner() { CleanerTask task = new CleanerTask(this.oldFileDir, true); - POOL.submit(task); + pool.execute(task); return task.join(); } @@ -407,7 +304,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu @VisibleForTesting int getChorePoolSize() { - return POOL.size; + return pool.getSize(); } /** @@ -426,10 +323,11 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu } /** - * Attemps to clean up a directory, its subdirectories, and files. - * Return value is true if everything was deleted. false on partial / total failures. + * Attemps to clean up a directory, its subdirectories, and files. Return value is true if + * everything was deleted. false on partial / total failures. */ - private class CleanerTask extends RecursiveTask<Boolean> { + private final class CleanerTask extends RecursiveTask<Boolean> { + private static final long serialVersionUID = -1584635903138015418L; private final Path dir; private final boolean root; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java new file mode 100644 index 0000000..f201ae2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.cleaner; + +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; + +/** + * The thread pool used for scan directories + */ +@InterfaceAudience.Private +public class DirScanPool implements ConfigurationObserver { + private static final Log LOG = LogFactory.getLog(DirScanPool.class); + private volatile int size; + private ForkJoinPool pool; + private int cleanerLatch; + private boolean reconfigNotification; + + public DirScanPool(Configuration conf) { + String poolSize = conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE); + size = CleanerChore.calculatePoolSize(poolSize); + // poolSize may be 0 or 0.0 from a careless configuration, + // double check to make sure. + size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size; + pool = new ForkJoinPool(size); + LOG.info("Cleaner pool size is " + size); + cleanerLatch = 0; + } + + /** + * Checks if pool can be updated. If so, mark for update later. + * @param conf configuration + */ + @Override + public synchronized void onConfigurationChange(Configuration conf) { + int newSize = CleanerChore.calculatePoolSize( + conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE)); + if (newSize == size) { + LOG.trace("Size from configuration is same as previous=" + newSize + ", no need to update."); + return; + } + size = newSize; + // Chore is working, update it later. + reconfigNotification = true; + } + + synchronized void latchCountUp() { + cleanerLatch++; + } + + synchronized void latchCountDown() { + cleanerLatch--; + notifyAll(); + } + + synchronized void execute(ForkJoinTask<?> task) { + pool.execute(task); + } + + public synchronized void shutdownNow() { + if (pool == null || pool.isShutdown()) { + return; + } + pool.shutdownNow(); + } + + synchronized void tryUpdatePoolSize(long timeout) { + if (!reconfigNotification) { + return; + } + reconfigNotification = false; + long stopTime = System.currentTimeMillis() + timeout; + while (cleanerLatch != 0 && timeout > 0) { + try { + wait(timeout); + timeout = stopTime - System.currentTimeMillis(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + shutdownNow(); + LOG.info("Update chore's pool size from " + pool.getParallelism() + " to " + size); + pool = new ForkJoinPool(size); + } + + public int getSize() { + return size; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java index 6691f66..48cb17e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java @@ -49,8 +49,8 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins"; public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, - Path directory) { - this(period, stopper, conf, fs, directory, null); + Path directory, DirScanPool pool) { + this(period, stopper, conf, fs, directory, pool, null); } // Configuration key for large/small throttle point @@ -114,9 +114,9 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme * @param params params could be used in subclass of BaseHFileCleanerDelegate */ public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, - Path directory, Map<String, Object> params) { + Path directory, DirScanPool pool, Map<String, Object> params) { super("HFileCleaner", period, stopper, conf, fs, - directory, MASTER_HFILE_CLEANER_PLUGINS, params); + directory, MASTER_HFILE_CLEANER_PLUGINS, pool, params); throttlePoint = conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); largeQueueInitSize = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java index 8b60803..999e725 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java @@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.wal.DefaultWALProvider; /** @@ -44,7 +45,8 @@ import org.apache.hadoop.hbase.wal.DefaultWALProvider; * @see BaseLogCleanerDelegate */ @InterfaceAudience.Private -public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> { +public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> + implements ConfigurationObserver { private static final Log LOG = LogFactory.getLog(LogCleaner.class.getName()); public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size"; @@ -74,8 +76,8 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> { * @param oldLogDir the path to the archived logs */ public LogCleaner(final int p, final Stoppable s, Configuration conf, FileSystem fs, - Path oldLogDir) { - super("LogsCleaner", p, s, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS); + Path oldLogDir, DirScanPool pool) { + super("LogsCleaner", p, s, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS, pool); this.pendingDelete = new LinkedBlockingQueue<>(); int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); this.oldWALsCleaner = createOldWalsCleaner(size); @@ -92,8 +94,6 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> { @Override public void onConfigurationChange(Configuration conf) { - super.onConfigurationChange(conf); - int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); if (newSize == oldWALsCleaner.size()) { if (LOG.isDebugEnabled()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java index e904711..850c32a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java @@ -24,7 +24,10 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Objects; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,6 +42,8 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.cleaner.DirScanPool; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -68,6 +73,8 @@ public class TestHFileArchiving { private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static final byte[] TEST_FAM = Bytes.toBytes("fam"); + private static DirScanPool POOL; + /** * Setup the config for the cluster */ @@ -78,6 +85,8 @@ public class TestHFileArchiving { // We don't want the cleaner to remove files. The tests do that. UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().cancel(true); + + POOL = new DirScanPool(UTIL.getConfiguration()); } private static void setupConf(Configuration conf) { @@ -96,20 +105,13 @@ public class TestHFileArchiving { @After public void tearDown() throws Exception { // cleanup the archive directory - try { - clearArchiveDirectory(); - } catch (IOException e) { - Assert.fail("Failure to delete archive directory:" + e.getMessage()); - } + clearArchiveDirectory(); } @AfterClass public static void cleanupTest() throws Exception { - try { - UTIL.shutdownMiniCluster(); - } catch (Exception e) { - // NOOP; - } + UTIL.shutdownMiniCluster(); + POOL.shutdownNow(); } @Test @@ -370,7 +372,7 @@ public class TestHFileArchiving { Stoppable stoppable = new StoppableImplementation(); // The cleaner should be looping without long pauses to reproduce the race condition. - HFileCleaner cleaner = new HFileCleaner(1, stoppable, conf, fs, archiveDir); + HFileCleaner cleaner = getHFileCleaner(stoppable, conf, fs, archiveDir); try { choreService.scheduleChore(cleaner); @@ -413,6 +415,15 @@ public class TestHFileArchiving { } } + // Avoid passing a null master to CleanerChore, see HBASE-21175 + private HFileCleaner getHFileCleaner(Stoppable stoppable, Configuration conf, FileSystem fs, + Path archiveDir) throws IOException { + Map<String, Object> params = new HashMap<>(); + params.put(HMaster.MASTER, UTIL.getMiniHBaseCluster().getMaster()); + HFileCleaner cleaner = new HFileCleaner(1, stoppable, conf, fs, archiveDir, POOL); + return Objects.requireNonNull(cleaner); + } + private void clearArchiveDirectory() throws IOException { UTIL.getTestFileSystem().delete( new Path(UTIL.getDefaultRootDirPath(), HConstants.HFILE_ARCHIVE_DIRECTORY), true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java index b2139a8..967bb14 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java @@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.master.cleaner.CleanerChore; +import org.apache.hadoop.hbase.master.cleaner.DirScanPool; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.ClusterConnection; @@ -83,6 +83,7 @@ public class TestZooKeeperTableArchiveClient { private final List<Path> toCleanup = new ArrayList<Path>(); private static ClusterConnection CONNECTION; private static RegionServerServices rss; + private static DirScanPool POOL; /** * Setup the config for the cluster @@ -98,6 +99,7 @@ public class TestZooKeeperTableArchiveClient { String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher); ZKUtil.createWithParents(watcher, archivingZNode); rss = mock(RegionServerServices.class); + POOL= new DirScanPool(UTIL.getConfiguration()); } private static void setupConf(Configuration conf) { @@ -125,12 +127,9 @@ public class TestZooKeeperTableArchiveClient { @AfterClass public static void cleanupTest() throws Exception { - try { - CONNECTION.close(); - UTIL.shutdownMiniZKCluster(); - } catch (Exception e) { - LOG.warn("problem shutting down cluster", e); - } + CONNECTION.close(); + UTIL.shutdownMiniZKCluster(); + POOL.shutdownNow(); } /** @@ -171,7 +170,6 @@ public class TestZooKeeperTableArchiveClient { Configuration conf = UTIL.getConfiguration(); // setup the delegate Stoppable stop = new StoppableImplementation(); - CleanerChore.initChorePool(conf); HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); @@ -226,7 +224,6 @@ public class TestZooKeeperTableArchiveClient { // setup the delegate Stoppable stop = new StoppableImplementation(); final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); - CleanerChore.initChorePool(conf); HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); @@ -320,7 +317,7 @@ public class TestZooKeeperTableArchiveClient { Stoppable stop) { conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, LongTermArchivingHFileCleaner.class.getCanonicalName()); - return new HFileCleaner(1000, stop, conf, fs, archiveDir); + return new HFileCleaner(1000, stop, conf, fs, archiveDir, POOL); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java index f5e30d6..b110ffc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java @@ -51,17 +51,18 @@ public class TestCleanerChore { private static final Log LOG = LogFactory.getLog(TestCleanerChore.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static DirScanPool POOL; @BeforeClass public static void setup() { - CleanerChore.initChorePool(UTIL.getConfiguration()); + POOL = new DirScanPool(UTIL.getConfiguration()); } @AfterClass public static void cleanup() throws Exception { // delete and recreate the test directory, ensuring a clean test dir between tests UTIL.cleanupTestDir(); - CleanerChore.shutDownChorePool(); + POOL.shutdownNow(); } @@ -74,7 +75,8 @@ public class TestCleanerChore { String confKey = "hbase.test.cleaner.delegates"; conf.set(confKey, NeverDelete.class.getName()); - AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); // create the directory layout in the directory to clean Path parent = new Path(testDir, "parent"); @@ -116,7 +118,8 @@ public class TestCleanerChore { } }; - AllValidPaths chore = new AllValidPaths("test-retry-ioe", stop, conf, filtered, testDir, confKey); + AllValidPaths chore = + new AllValidPaths("test-retry-ioe", stop, conf, filtered, testDir, confKey, POOL); // trouble talking to the filesystem Boolean result = chore.runCleaner(); @@ -147,7 +150,8 @@ public class TestCleanerChore { String confKey = "hbase.test.cleaner.delegates"; conf.set(confKey, AlwaysDelete.class.getName()); - AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); // create the directory layout in the directory to clean Path parent = new Path(testDir, "parent"); @@ -188,7 +192,8 @@ public class TestCleanerChore { String confKey = "hbase.test.cleaner.delegates"; conf.set(confKey, AlwaysDelete.class.getName()); - AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); // spy on the delegate to ensure that we don't check for directories AlwaysDelete delegate = (AlwaysDelete) chore.cleanersChain.get(0); AlwaysDelete spy = Mockito.spy(delegate); @@ -219,7 +224,8 @@ public class TestCleanerChore { String confKey = "hbase.test.cleaner.delegates"; conf.set(confKey, AlwaysDelete.class.getName()); - AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); // also create a file in the top level directory Path topFile = new Path(testDir, "topFile"); @@ -250,7 +256,8 @@ public class TestCleanerChore { String confKey = "hbase.test.cleaner.delegates"; conf.set(confKey, AlwaysDelete.class.getName()); - AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); // spy on the delegate to ensure that we don't check for directories AlwaysDelete delegate = (AlwaysDelete) chore.cleanersChain.get(0); AlwaysDelete spy = Mockito.spy(delegate); @@ -309,7 +316,8 @@ public class TestCleanerChore { String confKey = "hbase.test.cleaner.delegates"; conf.set(confKey, AlwaysDelete.class.getName()); - AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); // spy on the delegate to ensure that we don't check for directories AlwaysDelete delegate = (AlwaysDelete) chore.cleanersChain.get(0); AlwaysDelete spy = Mockito.spy(delegate); @@ -353,7 +361,8 @@ public class TestCleanerChore { String confKey = "hbase.test.cleaner.delegates"; conf.set(confKey, AlwaysDelete.class.getName()); - AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); // Enable cleaner chore.setEnabled(true); @@ -386,7 +395,8 @@ public class TestCleanerChore { String confKey = "hbase.test.cleaner.delegates"; conf.set(confKey, AlwaysDelete.class.getName()); - AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); // Disable cleaner chore.setEnabled(false); @@ -419,7 +429,7 @@ public class TestCleanerChore { } // have at least 2 available processors/cores - int initPoolSize = availableProcessorNum / 2; + int initPoolSize = availableProcessorNum / 2; int changedPoolSize = availableProcessorNum; Stoppable stop = new StoppableImplementation(); @@ -430,7 +440,7 @@ public class TestCleanerChore { conf.set(confKey, AlwaysDelete.class.getName()); conf.set(CleanerChore.CHORE_POOL_SIZE, String.valueOf(initPoolSize)); final AllValidPaths chore = - new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); chore.setEnabled(true); // Create subdirs under testDir int dirNums = 6; @@ -454,7 +464,7 @@ public class TestCleanerChore { t.start(); // Change size of chore's pool conf.set(CleanerChore.CHORE_POOL_SIZE, String.valueOf(changedPoolSize)); - chore.onConfigurationChange(conf); + POOL.onConfigurationChange(conf); assertEquals(changedPoolSize, chore.getChorePoolSize()); // Stop chore t.join(); @@ -469,14 +479,13 @@ public class TestCleanerChore { String confKey = "hbase.test.cleaner.delegates"; conf.set(confKey, AlwaysDelete.class.getName()); conf.set(CleanerChore.CHORE_POOL_SIZE, "2"); - AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); int numProcs = Runtime.getRuntime().availableProcessors(); // Sanity - assertEquals(numProcs, chore.calculatePoolSize(Integer.toString(numProcs))); + assertEquals(numProcs, CleanerChore.calculatePoolSize(Integer.toString(numProcs))); // The implementation does not allow us to set more threads than we have processors - assertEquals(numProcs, chore.calculatePoolSize(Integer.toString(numProcs + 2))); + assertEquals(numProcs, CleanerChore.calculatePoolSize(Integer.toString(numProcs + 2))); // Force us into the branch that is multiplying 0.0 against the number of processors - assertEquals(1, chore.calculatePoolSize("0.0")); + assertEquals(1, CleanerChore.calculatePoolSize("0.0")); } private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException { @@ -496,8 +505,8 @@ public class TestCleanerChore { private static class AllValidPaths extends CleanerChore<BaseHFileCleanerDelegate> { public AllValidPaths(String name, Stoppable s, Configuration conf, FileSystem fs, - Path oldFileDir, String confkey) { - super(name, Integer.MAX_VALUE, s, conf, fs, oldFileDir, confkey); + Path oldFileDir, String confkey, DirScanPool pool) { + super(name, Integer.MAX_VALUE, s, conf, fs, oldFileDir, confkey, pool); } // all paths are valid diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java index 099d7ef..a59438d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java @@ -56,16 +56,19 @@ public class TestHFileCleaner { private final static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static DirScanPool POOL; + @BeforeClass public static void setupCluster() throws Exception { // have to use a minidfs cluster because the localfs doesn't modify file times correctly UTIL.startMiniDFSCluster(1); - CleanerChore.initChorePool(UTIL.getConfiguration()); + POOL = new DirScanPool(UTIL.getConfiguration()); } @AfterClass public static void shutdownCluster() throws IOException { UTIL.shutdownMiniDFSCluster(); + POOL.shutdownNow(); } @Test @@ -107,9 +110,10 @@ public class TestHFileCleaner { "org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner"); conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl); Server server = new DummyServer(); - Path archivedHfileDir = new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY); + Path archivedHfileDir = + new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY); FileSystem fs = FileSystem.get(conf); - HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir); + HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir, POOL); // Create 2 invalid files, 1 "recent" file, 1 very new file and 30 old files final long createTime = System.currentTimeMillis(); @@ -172,11 +176,12 @@ public class TestHFileCleaner { // no cleaner policies = delete all files conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, ""); Server server = new DummyServer(); - Path archivedHfileDir = new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY); + Path archivedHfileDir = + new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY); // setup the cleaner FileSystem fs = UTIL.getDFSCluster().getFileSystem(); - HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir); + HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir, POOL); // make all the directories for archiving files Path table = new Path(archivedHfileDir, "table"); @@ -269,7 +274,7 @@ public class TestHFileCleaner { // setup the cleaner FileSystem fs = UTIL.getDFSCluster().getFileSystem(); - HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir); + HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir, POOL); // clean up archive directory fs.delete(archivedHfileDir, true); fs.mkdirs(archivedHfileDir); @@ -298,7 +303,7 @@ public class TestHFileCleaner { // setup the cleaner FileSystem fs = UTIL.getDFSCluster().getFileSystem(); - HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir); + HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir, POOL); // clean up archive directory fs.delete(archivedHfileDir, true); fs.mkdirs(archivedHfileDir); @@ -339,7 +344,7 @@ public class TestHFileCleaner { // setup the cleaner FileSystem fs = UTIL.getDFSCluster().getFileSystem(); - final HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir); + final HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir, POOL); Assert.assertEquals(ORIGINAL_THROTTLE_POINT, cleaner.getThrottlePoint()); Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize()); Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java index 76709a4..b2df5ce 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java @@ -36,23 +36,36 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.io.HFileLink; -import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; /** - * Test the HFileLink Cleaner. - * HFiles with links cannot be deleted until a link is present. + * Test the HFileLink Cleaner. HFiles with links cannot be deleted until a link is present. */ -@Category(SmallTests.class) +@Category(MediumTests.class) public class TestHFileLinkCleaner { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static DirScanPool POOL; + + @BeforeClass + public static void setUp() { + POOL = new DirScanPool(TEST_UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDown() { + POOL.shutdownNow(); + } + @Test public void testHFileLinkCleaning() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); @@ -96,8 +109,7 @@ public class TestHFileLinkCleaner { final long ttl = 1000; conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl); Server server = new DummyServer(); - CleanerChore.initChorePool(conf); - HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archiveDir); + HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archiveDir, POOL); // Link backref cannot be removed cleaner.chore(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 2578ec9..b295484 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -71,6 +71,7 @@ import org.mockito.stubbing.Answer; public class TestLogsCleaner { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static DirScanPool POOL; /** * @throws java.lang.Exception @@ -79,7 +80,7 @@ public class TestLogsCleaner { public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniZKCluster(); TEST_UTIL.startMiniDFSCluster(1); - CleanerChore.initChorePool(TEST_UTIL.getConfiguration()); + POOL = new DirScanPool(TEST_UTIL.getConfiguration()); } /** @@ -89,6 +90,7 @@ public class TestLogsCleaner { public static void tearDownAfterClass() throws Exception { TEST_UTIL.shutdownMiniZKCluster(); TEST_UTIL.shutdownMiniDFSCluster(); + POOL.shutdownNow(); } @Test @@ -149,7 +151,7 @@ public class TestLogsCleaner { assertEquals(34, fs.listStatus(oldLogDir).length); - LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir); + LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir, POOL); cleaner.chore(); @@ -272,7 +274,7 @@ public class TestLogsCleaner { Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(), HConstants.HREGION_OLDLOGDIR_NAME); FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); - final LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir); + final LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir, POOL); assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE, cleaner.getSizeOfCleaners()); assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, cleaner.getCleanerThreadTimeoutMsec());