HBASE-20352 [Chore] Backport HBASE-18309 (Support multi threads in CleanerChore) to branch-1
Signed-off-by: Yu Li <l...@apache.org> Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/193d1dcb Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/193d1dcb Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/193d1dcb Branch: refs/heads/branch-1.3 Commit: 193d1dcb72c22252fc86ee8433c765c42349d3cc Parents: 35e94c9 Author: Reid Chan <reidddc...@outlook.com> Authored: Wed Apr 11 14:16:08 2018 +0800 Committer: Andrew Purtell <apurt...@apache.org> Committed: Wed Dec 12 18:08:18 2018 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/master/HMaster.java | 4 + .../hbase/master/cleaner/CleanerChore.java | 405 +++++++++++++++---- .../hadoop/hbase/master/cleaner/LogCleaner.java | 181 +++++++++ .../org/apache/hadoop/hbase/util/FSUtils.java | 75 ++++ .../hadoop/hbase/util/FileStatusFilter.java | 36 ++ .../TestZooKeeperTableArchiveClient.java | 3 + .../hbase/master/cleaner/TestCleanerChore.java | 164 +++++++- .../hbase/master/cleaner/TestHFileCleaner.java | 1 + .../master/cleaner/TestHFileLinkCleaner.java | 1 + .../hbase/master/cleaner/TestLogsCleaner.java | 57 +++ 10 files changed, 837 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/193d1dcb/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0906fca..67c7787 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -97,6 +97,7 @@ import org.apache.hadoop.hbase.master.balancer.BalancerChore; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; +import org.apache.hadoop.hbase.master.cleaner.CleanerChore; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore; @@ -859,6 +860,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { status.markComplete("Initialization successful"); LOG.info("Master has completed initialization"); configurationManager.registerObserver(this.balancer); + configurationManager.registerObserver(this.logCleaner); // Set master as 'initialized'. setInitialized(true); @@ -1176,6 +1178,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server { this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1); startProcedureExecutor(); + // Initial cleaner chore + CleanerChore.initChorePool(conf); // Start log cleaner thread int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000); this.logCleaner = http://git-wip-us.apache.org/repos/asf/hbase/blob/193d1dcb/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index d54b7aa..dc614fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -21,6 +21,11 @@ import java.io.IOException; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.RecursiveTask; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,12 +33,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.FileStatusFilter; +import org.apache.hadoop.ipc.RemoteException; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -42,21 +51,113 @@ import com.google.common.collect.Lists; * Abstract Cleaner that uses a chain of delegates to clean a directory of files * @param <T> Cleaner delegate class that is dynamically loaded from configuration */ -public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore { +@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", + justification="Static pool will be only updated once.") +@InterfaceAudience.Private +public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore + implements ConfigurationObserver { private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName()); + private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors(); - private final FileSystem fs; + /** + * If it is an integer and >= 1, it would be the size; + * if 0.0 < size <= 1.0, size would be available processors * size. + * Pay attention that 1.0 is different from 1, former indicates it will use 100% of cores, + * while latter will use only 1 thread for chore to scan dir. + */ + public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size"; + private static final String DEFAULT_CHORE_POOL_SIZE = "0.25"; + + private static class DirScanPool { + int size; + ForkJoinPool pool; + int cleanerLatch; + AtomicBoolean reconfigNotification; + + DirScanPool(Configuration conf) { + String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE); + size = calculatePoolSize(poolSize); + // poolSize may be 0 or 0.0 from a careless configuration, + // double check to make sure. + size = size == 0 ? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE) : size; + pool = new ForkJoinPool(size); + LOG.info("Cleaner pool size is " + size); + reconfigNotification = new AtomicBoolean(false); + cleanerLatch = 0; + } + + /** + * Checks if pool can be updated. If so, mark for update later. + * @param conf configuration + */ + synchronized void markUpdate(Configuration conf) { + int newSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE)); + if (newSize == size) { + LOG.trace("Size from configuration is same as previous=" + newSize + + " no need to update."); + return; + } + size = newSize; + // Chore is working, update it later. + reconfigNotification.set(true); + } + + /** + * Update pool with new size. + */ + synchronized void updatePool(long timeout) { + long stopWaitTime = System.currentTimeMillis() + timeout; + while (cleanerLatch != 0 && timeout > 0) { + try { + wait(timeout); + timeout = stopWaitTime - System.currentTimeMillis(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + pool.shutdownNow(); + LOG.info("Update chore's pool size from " + pool.getParallelism() + " to " + size); + pool = new ForkJoinPool(size); + } + + synchronized void latchCountUp() { + cleanerLatch++; + } + + synchronized void latchCountDown() { + cleanerLatch--; + notifyAll(); + } + + @SuppressWarnings({"FutureReturnValueIgnored","rawtypes","unchecked"}) + synchronized void submit(ForkJoinTask task) { + pool.submit(task); + } + } + // It may be waste resources for each cleaner chore own its pool, + // so let's make pool for all cleaner chores. + private static volatile DirScanPool POOL; + + protected final FileSystem fs; private final Path oldFileDir; private final Configuration conf; protected List<T> cleanersChain; protected Map<String, Object> params; + private AtomicBoolean enabled = new AtomicBoolean(true); public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, FileSystem fs, Path oldFileDir, String confKey) { this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null); } + public static void initChorePool(Configuration conf) { + if (POOL == null) { + POOL = new DirScanPool(conf); + } + } + /** * @param name name of the chore being run * @param sleepPeriod the period of time to sleep between each run @@ -70,6 +171,9 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, FileSystem fs, Path oldFileDir, String confKey, Map<String, Object> params) { super(name, s, sleepPeriod); + + Preconditions.checkNotNull(POOL, "Chore's pool isn't initialized, please call" + + "CleanerChore.initChorePool(Configuration) before new a cleaner chore."); this.fs = fs; this.oldFileDir = oldFileDir; this.conf = conf; @@ -79,6 +183,36 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu /** + * Calculate size for cleaner pool. + * @param poolSize size from configuration + * @return size of pool after calculation + */ + static int calculatePoolSize(String poolSize) { + if (poolSize.matches("[1-9][0-9]*")) { + // If poolSize is an integer, return it directly, + // but upmost to the number of available processors. + int size = Math.min(Integer.parseInt(poolSize), AVAIL_PROCESSORS); + if (size == AVAIL_PROCESSORS) { + LOG.warn("Use full core processors to scan dir, size=" + size); + } + return size; + } else if (poolSize.matches("0.[0-9]+|1.0")) { + // if poolSize is a double, return poolSize * availableProcessors; + // Ensure that we always return at least one. + int computedThreads = (int) (AVAIL_PROCESSORS * Double.valueOf(poolSize)); + if (computedThreads < 1) { + LOG.debug("Computed " + computedThreads + " threads for CleanerChore, using 1 instead"); + return 1; + } + return computedThreads; + } else { + LOG.error("Unrecognized value: " + poolSize + " for " + CHORE_POOL_SIZE + + ", use default config: " + DEFAULT_CHORE_POOL_SIZE + " instead."); + return calculatePoolSize(DEFAULT_CHORE_POOL_SIZE); + } + } + + /** * Validate the file to see if it even belongs in the directory. If it is valid, then the file * will go through the cleaner delegates, but otherwise the file is just deleted. * @param file full {@link Path} of the file to be checked @@ -104,6 +238,11 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu } } + @Override + public void onConfigurationChange(Configuration conf) { + POOL.markUpdate(conf); + } + /** * A utility method to create new instances of LogCleanerDelegate based on the class name of the * LogCleanerDelegate. @@ -129,87 +268,33 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu @Override protected void chore() { - try { - FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir); - checkAndDeleteEntries(files); - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.warn("Error while cleaning the logs", e); - } - } - - /** - * Loop over the given directory entries, and check whether they can be deleted. - * If an entry is itself a directory it will be recursively checked and deleted itself iff - * all subentries are deleted (and no new subentries are added in the mean time) - * - * @param entries directory entries to check - * @return true if all entries were successfully deleted - */ - private boolean checkAndDeleteEntries(FileStatus[] entries) { - if (entries == null) { - return true; - } - boolean allEntriesDeleted = true; - List<FileStatus> files = Lists.newArrayListWithCapacity(entries.length); - for (FileStatus child : entries) { - Path path = child.getPath(); - if (child.isDirectory()) { - // for each subdirectory delete it and all entries if possible - if (!checkAndDeleteDirectory(path)) { - allEntriesDeleted = false; + if (getEnabled()) { + try { + POOL.latchCountUp(); + if (runCleaner()) { + LOG.debug("Cleaned all WALs under " + oldFileDir); + } else { + LOG.warn("WALs outstanding under " + oldFileDir); } - } else { - // collect all files to attempt to delete in one batch - files.add(child); + } finally { + POOL.latchCountDown(); } + // After each cleaner chore, checks if received reconfigure notification while cleaning. + // First in cleaner turns off notification, to avoid another cleaner updating pool again. + if (POOL.reconfigNotification.compareAndSet(true, false)) { + // This cleaner is waiting for other cleaners finishing their jobs. + // To avoid missing next chore, only wait 0.8 * period, then shutdown. + POOL.updatePool((long) (0.8 * getTimeUnit().toMillis(getPeriod()))); + } + } else { + LOG.debug("Cleaner chore disabled! Not cleaning."); } - if (!checkAndDeleteFiles(files)) { - allEntriesDeleted = false; - } - return allEntriesDeleted; } - - /** - * Attempt to delete a directory and all files under that directory. Each child file is passed - * through the delegates to see if it can be deleted. If the directory has no children when the - * cleaners have finished it is deleted. - * <p> - * If new children files are added between checks of the directory, the directory will <b>not</b> - * be deleted. - * @param dir directory to check - * @return <tt>true</tt> if the directory was deleted, <tt>false</tt> otherwise. - */ - @VisibleForTesting boolean checkAndDeleteDirectory(Path dir) { - if (LOG.isTraceEnabled()) { - LOG.trace("Checking directory: " + dir); - } - - try { - FileStatus[] children = FSUtils.listStatus(fs, dir); - boolean allChildrenDeleted = checkAndDeleteEntries(children); - - // if the directory still has children, we can't delete it, so we are done - if (!allChildrenDeleted) return false; - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.warn("Error while listing directory: " + dir, e); - // couldn't list directory, so don't try to delete, and don't return success - return false; - } - // otherwise, all the children (that we know about) have been deleted, so we should try to - // delete this directory. However, don't do so recursively so we don't delete files that have - // been added since we last checked. - try { - return fs.delete(dir, false); - } catch (IOException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Couldn't delete directory: " + dir, e); - } - // couldn't delete w/o exception, so we can't return success. - return false; - } + public Boolean runCleaner() { + CleanerTask task = new CleanerTask(this.oldFileDir, true); + POOL.submit(task); + return task.join(); } /** @@ -219,6 +304,10 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu * @return true iff successfully deleted all files */ private boolean checkAndDeleteFiles(List<FileStatus> files) { + if (files == null) { + return true; + } + // first check to see if the path is valid List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size()); List<FileStatus> invalidFiles = Lists.newArrayList(); @@ -256,12 +345,19 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu } Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles); + return deleteFiles(filesToDelete) == files.size(); + } + + /** + * Delete the given files + * @param filesToDelete files to delete + * @return number of deleted files + */ + protected int deleteFiles(Iterable<FileStatus> filesToDelete) { int deletedFileCount = 0; for (FileStatus file : filesToDelete) { Path filePath = file.getPath(); - if (LOG.isDebugEnabled()) { - LOG.debug("Removing: " + filePath + " from archive"); - } + LOG.trace("Removing " + file + " from archive"); try { boolean success = this.fs.delete(filePath, false); if (success) { @@ -271,12 +367,12 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu + ", but couldn't. Run cleaner chain and attempt to delete on next pass."); } } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); + e = e instanceof RemoteException ? + ((RemoteException)e).unwrapRemoteException() : e; LOG.warn("Error while deleting: " + filePath, e); } } - - return deletedFileCount == files.size(); + return deletedFileCount; } @Override @@ -289,4 +385,143 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu } } } + + @VisibleForTesting + int getChorePoolSize() { + return POOL.size; + } + + /** + * @param enabled + */ + public boolean setEnabled(final boolean enabled) { + return this.enabled.getAndSet(enabled); + } + + public boolean getEnabled() { + return this.enabled.get(); + } + + private interface Action<T> { + T act() throws IOException; + } + + private class CleanerTask extends RecursiveTask<Boolean> { + private final Path dir; + private final boolean root; + + CleanerTask(final FileStatus dir, final boolean root) { + this(dir.getPath(), root); + } + + CleanerTask(final Path dir, final boolean root) { + this.dir = dir; + this.root = root; + } + + @Override + protected Boolean compute() { + LOG.debug("Cleaning under " + dir); + List<FileStatus> subDirs; + final List<FileStatus> files; + try { + subDirs = FSUtils.listStatusWithStatusFilter(fs, dir, new FileStatusFilter() { + @Override + public boolean accept(FileStatus f) { + return f.isDirectory(); + } + }); + files = FSUtils.listStatusWithStatusFilter(fs, dir, new FileStatusFilter() { + @Override + public boolean accept(FileStatus f) { + return f.isFile(); + } + }); + } catch (IOException ioe) { + LOG.warn(dir + " doesn't exist, just skip it. ", ioe); + return true; + } + + boolean nullSubDirs = subDirs == null; + if (nullSubDirs) { + LOG.trace("There is no subdir under " + dir); + } + if (files == null) { + LOG.trace("There is no file under " + dir); + } + + int capacity = nullSubDirs ? 0 : subDirs.size(); + final List<CleanerTask> tasks = Lists.newArrayListWithCapacity(capacity); + if (!nullSubDirs) { + for (FileStatus subdir : subDirs) { + CleanerTask task = new CleanerTask(subdir, false); + tasks.add(task); + task.fork(); + } + } + + boolean result = true; + result &= deleteAction(new Action<Boolean>() { + @Override + public Boolean act() throws IOException { + return checkAndDeleteFiles(files); + } + }, "files"); + result &= deleteAction(new Action<Boolean>() { + @Override + public Boolean act() throws IOException { + return getCleanResult(tasks); + } + }, "subdirs"); + // if and only if files and subdirs under current dir are deleted successfully, and + // it is not the root dir, then task will try to delete it. + if (result && !root) { + result &= deleteAction(new Action<Boolean>() { + @Override + public Boolean act() throws IOException { + return fs.delete(dir, false); + } + }, "dir"); + } + return result; + } + + /** + * Perform a delete on a specified type. + * @param deletion a delete + * @param type possible values are 'files', 'subdirs', 'dirs' + * @return true if it deleted successfully, false otherwise + */ + private boolean deleteAction(Action<Boolean> deletion, String type) { + boolean deleted; + try { + LOG.trace("Start deleting " + type + " under " + dir); + deleted = deletion.act(); + } catch (IOException ioe) { + LOG.warn("Could not delete " + type + " under " + dir, ioe); + deleted = false; + } + LOG.trace("Finish deleting " + type + " under " + dir + " deleted=" + deleted); + return deleted; + } + + /** + * Get cleaner results of subdirs. + * @param tasks subdirs cleaner tasks + * @return true if all subdirs deleted successfully, false for patial/all failures + * @throws IOException something happen during computation + */ + private boolean getCleanResult(List<CleanerTask> tasks) throws IOException { + boolean cleaned = true; + try { + for (CleanerTask task : tasks) { + cleaned &= task.get(); + } + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e); + } + return cleaned; + } + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/193d1dcb/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java index 1cc8d48..0c30f95 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java @@ -19,8 +19,17 @@ package org.apache.hadoop.hbase.master.cleaner; import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; +import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -37,6 +46,12 @@ import org.apache.hadoop.hbase.wal.DefaultWALProvider; public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> { private static final Log LOG = LogFactory.getLog(LogCleaner.class.getName()); + public static final String OLD_WALS_CLEANER_SIZE = "hbase.oldwals.cleaner.thread.size"; + public static final int OLD_WALS_CLEANER_DEFAULT_SIZE = 2; + + private final LinkedBlockingQueue<CleanerContext> pendingDelete; + private List<Thread> oldWALsCleaner; + /** * @param p the period of time to sleep between each run * @param s the stopper @@ -47,10 +62,176 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> { public LogCleaner(final int p, final Stoppable s, Configuration conf, FileSystem fs, Path oldLogDir) { super("LogsCleaner", p, s, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS); + this.pendingDelete = new LinkedBlockingQueue<>(); + int size = conf.getInt(OLD_WALS_CLEANER_SIZE, OLD_WALS_CLEANER_DEFAULT_SIZE); + this.oldWALsCleaner = createOldWalsCleaner(size); } @Override protected boolean validate(Path file) { return DefaultWALProvider.validateWALFilename(file.getName()); } + + @Override + public void onConfigurationChange(Configuration conf) { + super.onConfigurationChange(conf); + + int newSize = conf.getInt(OLD_WALS_CLEANER_SIZE, OLD_WALS_CLEANER_DEFAULT_SIZE); + if (newSize == oldWALsCleaner.size()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Size from configuration is the same as previous which is " + + newSize + ", no need to update."); + } + return; + } + interruptOldWALsCleaner(); + oldWALsCleaner = createOldWalsCleaner(newSize); + } + + @Override + protected int deleteFiles(Iterable<FileStatus> filesToDelete) { + List<CleanerContext> results = new LinkedList<>(); + for (FileStatus toDelete : filesToDelete) { + CleanerContext context = CleanerContext.createCleanerContext(toDelete); + if (context != null) { + pendingDelete.add(context); + results.add(context); + } + } + + int deletedFiles = 0; + for (CleanerContext res : results) { + deletedFiles += res.getResult(500) ? 1 : 0; + } + return deletedFiles; + } + + @Override + public synchronized void cleanup() { + super.cleanup(); + interruptOldWALsCleaner(); + } + + @VisibleForTesting + int getSizeOfCleaners() { + return oldWALsCleaner.size(); + } + + private List<Thread> createOldWalsCleaner(int size) { + LOG.info("Creating OldWALs cleaners with size=" + size); + + List<Thread> oldWALsCleaner = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + Thread cleaner = new Thread(new Runnable() { + @Override + public void run() { + deleteFile(); + } + }); + cleaner.setName("OldWALsCleaner-" + i); + cleaner.setDaemon(true); + cleaner.start(); + oldWALsCleaner.add(cleaner); + } + return oldWALsCleaner; + } + + private void interruptOldWALsCleaner() { + for (Thread cleaner : oldWALsCleaner) { + cleaner.interrupt(); + } + oldWALsCleaner.clear(); + } + + private void deleteFile() { + while (true) { + CleanerContext context = null; + boolean succeed = false; + boolean interrupted = false; + try { + context = pendingDelete.take(); + if (context != null) { + FileStatus toClean = context.getTargetToClean(); + succeed = this.fs.delete(toClean.getPath(), false); + } + } catch (InterruptedException ite) { + // It's most likely from configuration changing request + if (context != null) { + LOG.warn("Interrupted while cleaning oldWALs " + + context.getTargetToClean() + ", try to clean it next round."); + } + interrupted = true; + } catch (IOException e) { + // fs.delete() fails. + LOG.warn("Failed to clean oldwals with exception: " + e); + succeed = false; + } finally { + if (context != null) { + context.setResult(succeed); + } + if (interrupted) { + // Restore interrupt status + Thread.currentThread().interrupt(); + break; + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Exiting cleaner."); + } + } + + @Override + public synchronized void cancel(boolean mayInterruptIfRunning) { + super.cancel(mayInterruptIfRunning); + for (Thread t : oldWALsCleaner) { + t.interrupt(); + } + } + + private static final class CleanerContext { + // At most waits 60 seconds + static final long MAX_WAIT = 60 * 1000; + + final FileStatus target; + volatile boolean result; + volatile boolean setFromCleaner = false; + + static CleanerContext createCleanerContext(FileStatus status) { + return status != null ? new CleanerContext(status) : null; + } + + private CleanerContext(FileStatus status) { + this.target = status; + this.result = false; + } + + synchronized void setResult(boolean res) { + this.result = res; + this.setFromCleaner = true; + notify(); + } + + synchronized boolean getResult(long waitIfNotFinished) { + long totalTime = 0; + try { + while (!setFromCleaner) { + wait(waitIfNotFinished); + totalTime += waitIfNotFinished; + if (totalTime >= MAX_WAIT) { + LOG.warn("Spend too much time to delete oldwals " + target); + return result; + } + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting deletion of " + target); + return result; + } + return result; + } + + FileStatus getTargetToClean() { + return target; + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/193d1dcb/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 87c50d9..b3be54e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -31,8 +31,10 @@ import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -81,6 +83,7 @@ import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import com.google.common.collect.Iterators; import com.google.common.primitives.Ints; /** @@ -1731,6 +1734,78 @@ public abstract class FSUtils { } /** + * Filters FileStatuses in an array and returns a list + * + * @param input An array of FileStatuses + * @param filter A required filter to filter the array + * @return A list of FileStatuses + */ + public static List<FileStatus> filterFileStatuses(FileStatus[] input, + FileStatusFilter filter) { + if (input == null) return null; + return filterFileStatuses(Iterators.forArray(input), filter); + } + + /** + * Filters FileStatuses in an iterator and returns a list + * + * @param input An iterator of FileStatuses + * @param filter A required filter to filter the array + * @return A list of FileStatuses + */ + public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input, + FileStatusFilter filter) { + if (input == null) return null; + ArrayList<FileStatus> results = new ArrayList<>(); + while (input.hasNext()) { + FileStatus f = input.next(); + if (filter.accept(f)) { + results.add(f); + } + } + return results; + } + + /** + * Calls fs.listStatus() and treats FileNotFoundException as non-fatal + * This accommodates differences between hadoop versions, where hadoop 1 + * does not throw a FileNotFoundException, and return an empty FileStatus[] + * while Hadoop 2 will throw FileNotFoundException. + * + * @param fs file system + * @param dir directory + * @param filter file status filter + * @return null if dir is empty or doesn't exist, otherwise FileStatus list + */ + public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, + final Path dir, final FileStatusFilter filter) throws IOException { + FileStatus [] status = null; + try { + status = fs.listStatus(dir); + } catch (FileNotFoundException fnfe) { + // if directory doesn't exist, return null + if (LOG.isTraceEnabled()) { + LOG.trace(dir + " doesn't exist"); + } + } + + if (status == null || status.length < 1) { + return null; + } + + if (filter == null) { + return Arrays.asList(status); + } else { + List<FileStatus> status2 = filterFileStatuses(status, filter); + if (status2 == null || status2.isEmpty()) { + return null; + } else { + return status2; + } + } + } + + /** * Throw an exception if an action is not permitted by a user on a file. * * @param ugi http://git-wip-us.apache.org/repos/asf/hbase/blob/193d1dcb/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FileStatusFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FileStatusFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FileStatusFilter.java new file mode 100644 index 0000000..b3189b0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FileStatusFilter.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.fs.FileStatus; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface FileStatusFilter { + /** + * Tests whether or not the specified filestatus should be + * included in a filestatus list. + * + * @param f The filestatus to be tested + * @return <code>true</code> if and only if the filestatus + * should be included + */ + boolean accept(FileStatus f); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/193d1dcb/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java index a8713e7..b2139a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.master.cleaner.CleanerChore; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.ClusterConnection; @@ -170,6 +171,7 @@ public class TestZooKeeperTableArchiveClient { Configuration conf = UTIL.getConfiguration(); // setup the delegate Stoppable stop = new StoppableImplementation(); + CleanerChore.initChorePool(conf); HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); @@ -224,6 +226,7 @@ public class TestZooKeeperTableArchiveClient { // setup the delegate Stoppable stop = new StoppableImplementation(); final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); + CleanerChore.initChorePool(conf); HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); http://git-wip-us.apache.org/repos/asf/hbase/blob/193d1dcb/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java index 0bd0da5..505fd2c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java @@ -17,14 +17,17 @@ */ package org.apache.hadoop.hbase.master.cleaner; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -34,6 +37,7 @@ import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.StoppableImplementation; import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; @@ -46,6 +50,11 @@ public class TestCleanerChore { private static final Log LOG = LogFactory.getLog(TestCleanerChore.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + @Before + public void setup() throws Exception { + CleanerChore.initChorePool(UTIL.getConfiguration()); + } + @After public void cleanup() throws Exception { // delete and recreate the test directory, ensuring a clean test dir between tests @@ -275,11 +284,8 @@ public class TestCleanerChore { } }).when(spy).isFileDeletable(Mockito.any(FileStatus.class)); - // attempt to delete the directory, which - if (chore.checkAndDeleteDirectory(parent)) { - throw new Exception( - "Reported success deleting directory, should have failed when adding file mid-iteration"); - } + // run the chore + chore.chore(); // make sure all the directories + added file exist, but the original file is deleted assertTrue("Added file unexpectedly deleted", fs.exists(racyFile)); @@ -288,6 +294,154 @@ public class TestCleanerChore { Mockito.verify(spy, Mockito.times(1)).isFileDeletable(Mockito.any(FileStatus.class)); } + @Test + public void testDeleteFileWithCleanerEnabled() throws Exception { + Stoppable stop = new StoppableImplementation(); + Configuration conf = UTIL.getConfiguration(); + Path testDir = UTIL.getDataTestDir(); + FileSystem fs = UTIL.getTestFileSystem(); + String confKey = "hbase.test.cleaner.delegates"; + conf.set(confKey, AlwaysDelete.class.getName()); + + AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + + // Enable cleaner + chore.setEnabled(true); + + // create the directory layout in the directory to clean + Path parent = new Path(testDir, "parent"); + Path child = new Path(parent, "child"); + Path file = new Path(child, "someFile"); + fs.mkdirs(child); + + // touch a new file + fs.create(file).close(); + assertTrue("Test file didn't get created.", fs.exists(file)); + + // run the chore + chore.chore(); + + // verify all the files got deleted + assertFalse("File didn't get deleted", fs.exists(file)); + assertFalse("Empty directory didn't get deleted", fs.exists(child)); + assertFalse("Empty directory didn't get deleted", fs.exists(parent)); + } + + @Test + public void testDeleteFileWithCleanerDisabled() throws Exception { + Stoppable stop = new StoppableImplementation(); + Configuration conf = UTIL.getConfiguration(); + Path testDir = UTIL.getDataTestDir(); + FileSystem fs = UTIL.getTestFileSystem(); + String confKey = "hbase.test.cleaner.delegates"; + conf.set(confKey, AlwaysDelete.class.getName()); + + AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + + // Disable cleaner + chore.setEnabled(false); + + // create the directory layout in the directory to clean + Path parent = new Path(testDir, "parent"); + Path child = new Path(parent, "child"); + Path file = new Path(child, "someFile"); + fs.mkdirs(child); + + // touch a new file + fs.create(file).close(); + assertTrue("Test file didn't get created.", fs.exists(file)); + + // run the chore + chore.chore(); + + // verify all the files got deleted + assertTrue("File got deleted with cleaner disabled", fs.exists(file)); + assertTrue("Directory got deleted", fs.exists(child)); + assertTrue("Directory got deleted", fs.exists(parent)); + } + + @Test + public void testOnConfigurationChange() throws Exception { + int availableProcessorNum = Runtime.getRuntime().availableProcessors(); + if (availableProcessorNum == 1) { // no need to run this test + return; + } + + // have at least 2 available processors/cores + int initPoolSize = availableProcessorNum / 2; + int changedPoolSize = availableProcessorNum; + + Stoppable stop = new StoppableImplementation(); + Configuration conf = UTIL.getConfiguration(); + Path testDir = UTIL.getDataTestDir(); + FileSystem fs = UTIL.getTestFileSystem(); + String confKey = "hbase.test.cleaner.delegates"; + conf.set(confKey, AlwaysDelete.class.getName()); + conf.set(CleanerChore.CHORE_POOL_SIZE, String.valueOf(initPoolSize)); + final AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + chore.setEnabled(true); + // Create subdirs under testDir + int dirNums = 6; + Path[] subdirs = new Path[dirNums]; + for (int i = 0; i < dirNums; i++) { + subdirs[i] = new Path(testDir, "subdir-" + i); + fs.mkdirs(subdirs[i]); + } + // Under each subdirs create 6 files + for (Path subdir : subdirs) { + createFiles(fs, subdir, 6); + } + // Start chore + Thread t = new Thread(new Runnable() { + @Override + public void run() { + chore.chore(); + } + }); + t.setDaemon(true); + t.start(); + // Change size of chore's pool + conf.set(CleanerChore.CHORE_POOL_SIZE, String.valueOf(changedPoolSize)); + chore.onConfigurationChange(conf); + assertEquals(changedPoolSize, chore.getChorePoolSize()); + // Stop chore + t.join(); + } + + @Test + public void testMinimumNumberOfThreads() throws Exception { + Stoppable stop = new StoppableImplementation(); + Configuration conf = UTIL.getConfiguration(); + Path testDir = UTIL.getDataTestDir(); + FileSystem fs = UTIL.getTestFileSystem(); + String confKey = "hbase.test.cleaner.delegates"; + conf.set(confKey, AlwaysDelete.class.getName()); + conf.set(CleanerChore.CHORE_POOL_SIZE, "2"); + AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + int numProcs = Runtime.getRuntime().availableProcessors(); + // Sanity + assertEquals(numProcs, chore.calculatePoolSize(Integer.toString(numProcs))); + // The implementation does not allow us to set more threads than we have processors + assertEquals(numProcs, chore.calculatePoolSize(Integer.toString(numProcs + 2))); + // Force us into the branch that is multiplying 0.0 against the number of processors + assertEquals(1, chore.calculatePoolSize("0.0")); + } + + private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException { + Random random = new Random(); + for (int i = 0; i < numOfFiles; i++) { + int xMega = 1 + random.nextInt(3); // size of each file is between 1~3M + try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) { + for (int m = 0; m < xMega; m++) { + byte[] M = new byte[1024 * 1024]; + random.nextBytes(M); + fsdos.write(M); + } + } + } + } + private static class AllValidPaths extends CleanerChore<BaseHFileCleanerDelegate> { public AllValidPaths(String name, Stoppable s, Configuration conf, FileSystem fs, http://git-wip-us.apache.org/repos/asf/hbase/blob/193d1dcb/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java index 4665593..32d2afd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java @@ -56,6 +56,7 @@ public class TestHFileCleaner { public static void setupCluster() throws Exception { // have to use a minidfs cluster because the localfs doesn't modify file times correctly UTIL.startMiniDFSCluster(1); + CleanerChore.initChorePool(UTIL.getConfiguration()); } @AfterClass http://git-wip-us.apache.org/repos/asf/hbase/blob/193d1dcb/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java index a872679..76709a4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java @@ -96,6 +96,7 @@ public class TestHFileLinkCleaner { final long ttl = 1000; conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl); Server server = new DummyServer(); + CleanerChore.initChorePool(conf); HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archiveDir); // Link backref cannot be removed http://git-wip-us.apache.org/repos/asf/hbase/blob/193d1dcb/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index df5916c..db15e95 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -30,10 +30,12 @@ import java.net.URLEncoder; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -76,6 +78,8 @@ public class TestLogsCleaner { @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniZKCluster(); + TEST_UTIL.startMiniDFSCluster(1); + CleanerChore.initChorePool(TEST_UTIL.getConfiguration()); } /** @@ -84,6 +88,7 @@ public class TestLogsCleaner { @AfterClass public static void tearDownAfterClass() throws Exception { TEST_UTIL.shutdownMiniZKCluster(); + TEST_UTIL.shutdownMiniDFSCluster(); } @Test @@ -253,6 +258,58 @@ public class TestLogsCleaner { } } + @Test + public void testOnConfigurationChange() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(LogCleaner.OLD_WALS_CLEANER_SIZE, LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE); + // Prepare environments + Server server = new DummyServer(); + Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(), + HConstants.HREGION_OLDLOGDIR_NAME); + FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); + final LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir); + assertEquals(LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE, cleaner.getSizeOfCleaners()); + // Create dir and files for test + fs.delete(oldWALsDir, true); + fs.mkdirs(oldWALsDir); + int numOfFiles = 10; + createFiles(fs, oldWALsDir, numOfFiles); + FileStatus[] status = fs.listStatus(oldWALsDir); + assertEquals(numOfFiles, status.length); + // Start cleaner chore + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + cleaner.chore(); + } + }); + thread.setDaemon(true); + thread.start(); + // change size of cleaners dynamically + int sizeToChange = 4; + conf.setInt(LogCleaner.OLD_WALS_CLEANER_SIZE, sizeToChange); + cleaner.onConfigurationChange(conf); + assertEquals(sizeToChange, cleaner.getSizeOfCleaners()); + // Stop chore + thread.join(); + status = fs.listStatus(oldWALsDir); + assertEquals(0, status.length); + } + + private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException { + Random random = new Random(); + for (int i = 0; i < numOfFiles; i++) { + int xMega = 1 + random.nextInt(3); // size of each file is between 1~3M + try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) { + for (int m = 0; m < xMega; m++) { + byte[] M = new byte[1024 * 1024]; + random.nextBytes(M); + fsdos.write(M); + } + } + } + } + static class DummyServer implements Server { @Override