Repository: hbase Updated Branches: refs/heads/branch-1 a4db453c6 -> a52c6bf2f
HBASE-20557 Backport HBASE-17215 to branch-1 The second backport of HBASE-20555 Signed-off-by: Zach York <zy...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a52c6bf2 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a52c6bf2 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a52c6bf2 Branch: refs/heads/branch-1 Commit: a52c6bf2f3f742ae4bec29c5d10dfc339c1e3f0d Parents: a4db453 Author: TAK LON WU <wutak...@amazon.com> Authored: Sat Jun 23 08:43:21 2018 -0700 Committer: Zach York <zy...@apache.org> Committed: Mon Jul 9 11:32:04 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/master/HMaster.java | 1 + .../hbase/master/cleaner/HFileCleaner.java | 316 ++++++++++++++++++- .../hbase/regionserver/RSRpcServices.java | 3 +- .../hbase/master/cleaner/TestHFileCleaner.java | 152 +++++++++ 4 files changed, 468 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a52c6bf2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 21ec23c..c2f99f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -896,6 +896,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { (System.currentTimeMillis() - masterActiveTime) / 1000.0f)); this.masterFinishedInitializationTime = System.currentTimeMillis(); configurationManager.registerObserver(this.balancer); + configurationManager.registerObserver(this.hfileCleaner); configurationManager.registerObserver(this.logCleaner); // Set master as 'initialized'. http://git-wip-us.apache.org/repos/asf/hbase/blob/a52c6bf2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java index 89c316b..defe851 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java @@ -17,22 +17,32 @@ */ package org.apache.hadoop.hbase.master.cleaner; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; + /** * This Chore, every time it runs, will clear the HFiles in the hfile archive * folder that are deletable for each HFile cleaner in the chain. */ @InterfaceAudience.Private -public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> { +public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> implements + ConfigurationObserver { public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins"; @@ -41,6 +51,34 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> { this(period, stopper, conf, fs, directory, null); } + // Configuration key for large/small throttle point + public final static String HFILE_DELETE_THROTTLE_THRESHOLD = + "hbase.regionserver.thread.hfilecleaner.throttle"; + public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M + + // Configuration key for large queue size + public final static String LARGE_HFILE_DELETE_QUEUE_SIZE = + "hbase.regionserver.hfilecleaner.large.queue.size"; + public final static int DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE = 1048576; + + // Configuration key for small queue size + public final static String SMALL_HFILE_DELETE_QUEUE_SIZE = + "hbase.regionserver.hfilecleaner.small.queue.size"; + public final static int DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE = 1048576; + + private static final Log LOG = LogFactory.getLog(HFileCleaner.class); + + BlockingQueue<HFileDeleteTask> largeFileQueue; + BlockingQueue<HFileDeleteTask> smallFileQueue; + private int throttlePoint; + private int largeQueueSize; + private int smallQueueSize; + private List<Thread> threads = new ArrayList<Thread>(); + private boolean running; + + private long deletedLargeFiles = 0L; + private long deletedSmallFiles = 0L; + /** * @param period the period of time to sleep between each run * @param stopper the stopper @@ -53,6 +91,15 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> { Path directory, Map<String, Object> params) { super("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, params); + throttlePoint = + conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); + largeQueueSize = + conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE); + smallQueueSize = + conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE); + largeFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(largeQueueSize); + smallFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(smallQueueSize); + startHFileDeleteThreads(); } @Override @@ -69,4 +116,267 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> { public List<BaseHFileCleanerDelegate> getDelegatesForTesting() { return this.cleanersChain; } + + @Override + public int deleteFiles(Iterable<FileStatus> filesToDelete) { + int deletedFiles = 0; + List<HFileDeleteTask> tasks = new ArrayList<HFileDeleteTask>(); + // construct delete tasks and add into relative queue + for (FileStatus file : filesToDelete) { + HFileDeleteTask task = deleteFile(file); + if (task != null) { + tasks.add(task); + } + } + // wait for each submitted task to finish + for (HFileDeleteTask task : tasks) { + if (task.getResult()) { + deletedFiles++; + } + } + return deletedFiles; + } + + /** + * Construct an {@link HFileDeleteTask} for each file to delete and add into the correct queue + * @param file the file to delete + * @return HFileDeleteTask to track progress + */ + private HFileDeleteTask deleteFile(FileStatus file) { + HFileDeleteTask task = new HFileDeleteTask(file); + boolean enqueued = dispatch(task); + return enqueued ? task : null; + } + + private boolean dispatch(HFileDeleteTask task) { + if (task.fileLength >= this.throttlePoint) { + if (!this.largeFileQueue.offer(task)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Large file deletion queue is full"); + } + return false; + } + } else { + if (!this.smallFileQueue.offer(task)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Small file deletion queue is full"); + } + return false; + } + } + return true; + } + + @Override + public void cleanup() { + super.cleanup(); + stopHFileDeleteThreads(); + } + + /** + * Start threads for hfile deletion + */ + private void startHFileDeleteThreads() { + final String n = Thread.currentThread().getName(); + running = true; + // start thread for large file deletion + Thread large = new Thread() { + @Override + public void run() { + consumerLoop(largeFileQueue); + } + }; + large.setDaemon(true); + large.setName(n + "-HFileCleaner.large-" + System.currentTimeMillis()); + large.start(); + LOG.debug("Starting hfile cleaner for large files: " + large.getName()); + threads.add(large); + + // start thread for small file deletion + Thread small = new Thread() { + @Override + public void run() { + consumerLoop(smallFileQueue); + } + }; + small.setDaemon(true); + small.setName(n + "-HFileCleaner.small-" + System.currentTimeMillis()); + small.start(); + LOG.debug("Starting hfile cleaner for small files: " + small.getName()); + threads.add(small); + } + + protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) { + try { + while (running) { + HFileDeleteTask task = null; + try { + task = queue.take(); + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted while trying to take a task from queue", e); + } + break; + } + if (task != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing: " + task.filePath + " from archive"); + } + boolean succeed; + try { + succeed = this.fs.delete(task.filePath, false); + } catch (IOException e) { + LOG.warn("Failed to delete file " + task.filePath, e); + succeed = false; + } + task.setResult(succeed); + if (succeed) { + countDeletedFiles(queue == largeFileQueue); + } + } + } + } finally { + if (LOG.isDebugEnabled()) { + LOG.debug("Exit thread: " + Thread.currentThread()); + } + } + } + + // Currently only for testing purpose + private void countDeletedFiles(boolean isLarge) { + if (isLarge) { + if (deletedLargeFiles == Long.MAX_VALUE) { + LOG.info("Deleted more than Long.MAX_VALUE large files, reset counter to 0"); + deletedLargeFiles = 0L; + } + deletedLargeFiles++; + } else { + if (deletedSmallFiles == Long.MAX_VALUE) { + LOG.info("Deleted more than Long.MAX_VALUE small files, reset counter to 0"); + deletedSmallFiles = 0L; + } + deletedSmallFiles++; + } + } + + /** + * Stop threads for hfile deletion + */ + private void stopHFileDeleteThreads() { + running = false; + if (LOG.isDebugEnabled()) { + LOG.debug("Stopping file delete threads"); + } + for(Thread thread: threads){ + thread.interrupt(); + } + } + + static class HFileDeleteTask { + private static final long MAX_WAIT = 60 * 1000L; + private static final long WAIT_UNIT = 1000L; + + boolean done = false; + boolean result; + final Path filePath; + final long fileLength; + + public HFileDeleteTask(FileStatus file) { + this.filePath = file.getPath(); + this.fileLength = file.getLen(); + } + + public synchronized void setResult(boolean result) { + this.done = true; + this.result = result; + notify(); + } + + public synchronized boolean getResult() { + long waitTime = 0; + try { + while (!done) { + wait(WAIT_UNIT); + waitTime += WAIT_UNIT; + if (done) { + return this.result; + } + if (waitTime > MAX_WAIT) { + LOG.warn("Wait more than " + MAX_WAIT + " ms for deleting " + this.filePath + + ", exit..."); + return false; + } + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for result of deleting " + filePath + + ", will return false", e); + return false; + } + return this.result; + } + } + + @VisibleForTesting + public List<Thread> getCleanerThreads() { + return threads; + } + + @VisibleForTesting + public long getNumOfDeletedLargeFiles() { + return deletedLargeFiles; + } + + @VisibleForTesting + public long getNumOfDeletedSmallFiles() { + return deletedSmallFiles; + } + + @VisibleForTesting + public long getLargeQueueSize() { + return largeQueueSize; + } + + @VisibleForTesting + public long getSmallQueueSize() { + return smallQueueSize; + } + + @VisibleForTesting + public long getThrottlePoint() { + return throttlePoint; + } + + @Override + public void onConfigurationChange(Configuration conf) { + StringBuilder builder = new StringBuilder(); + builder.append("Updating configuration for HFileCleaner, previous throttle point: ") + .append(throttlePoint).append(", largeQueueSize: ").append(largeQueueSize) + .append(", smallQueueSize: ").append(smallQueueSize); + stopHFileDeleteThreads(); + this.throttlePoint = + conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); + this.largeQueueSize = + conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE); + this.smallQueueSize = + conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE); + // record the left over tasks + List<HFileDeleteTask> leftOverTasks = new ArrayList<>(); + for (HFileDeleteTask task : largeFileQueue) { + leftOverTasks.add(task); + } + for (HFileDeleteTask task : smallFileQueue) { + leftOverTasks.add(task); + } + largeFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(largeQueueSize); + smallFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(smallQueueSize); + threads.clear(); + builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueSize: ") + .append(largeQueueSize).append(", smallQueueSize: ").append(smallQueueSize); + LOG.debug(builder.toString()); + startHFileDeleteThreads(); + // re-dispatch the left over tasks + for (HFileDeleteTask task : leftOverTasks) { + dispatch(task); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a52c6bf2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 1fbc412..ae546e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1355,7 +1355,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, throw new RegionServerStoppedException("File system not available"); } if (!regionServer.isOnline()) { - throw new ServerNotRunningYetException("Server is not running yet"); + throw new ServerNotRunningYetException("Server " + regionServer.serverName + + " is not running yet"); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a52c6bf2/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java index 32d2afd..5712729 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java @@ -22,10 +22,12 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -42,6 +44,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -254,4 +257,153 @@ public class TestHFileCleaner { return null; } } + + @Test + public void testThreadCleanup() throws Exception { + Configuration conf = UTIL.getConfiguration(); + conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, ""); + Server server = new DummyServer(); + Path archivedHfileDir = + new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY); + + // setup the cleaner + FileSystem fs = UTIL.getDFSCluster().getFileSystem(); + HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir); + // clean up archive directory + fs.delete(archivedHfileDir, true); + fs.mkdirs(archivedHfileDir); + // create some file to delete + fs.createNewFile(new Path(archivedHfileDir, "dfd-dfd")); + // launch the chore + cleaner.chore(); + // call cleanup + cleaner.cleanup(); + // wait awhile for thread to die + Thread.sleep(100); + for (Thread thread : cleaner.getCleanerThreads()) { + Assert.assertFalse(thread.isAlive()); + } + } + + @Test + public void testLargeSmallIsolation() throws Exception { + Configuration conf = UTIL.getConfiguration(); + // no cleaner policies = delete all files + conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, ""); + conf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, 512 * 1024); + Server server = new DummyServer(); + Path archivedHfileDir = + new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY); + + // setup the cleaner + FileSystem fs = UTIL.getDFSCluster().getFileSystem(); + HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir); + // clean up archive directory + fs.delete(archivedHfileDir, true); + fs.mkdirs(archivedHfileDir); + // necessary set up + final int LARGE_FILE_NUM = 5; + final int SMALL_FILE_NUM = 20; + createFilesForTesting(LARGE_FILE_NUM, SMALL_FILE_NUM, fs, archivedHfileDir); + // call cleanup + cleaner.chore(); + + Assert.assertEquals(LARGE_FILE_NUM, cleaner.getNumOfDeletedLargeFiles()); + Assert.assertEquals(SMALL_FILE_NUM, cleaner.getNumOfDeletedSmallFiles()); + } + + @Test(timeout = 60 * 1000) + public void testOnConfigurationChange() throws Exception { + // constants + final int ORIGINAL_THROTTLE_POINT = 512 * 1024; + final int ORIGINAL_QUEUE_SIZE = 512; + final int UPDATE_THROTTLE_POINT = 1024;// small enough to change large/small check + final int UPDATE_QUEUE_SIZE = 1024; + final int LARGE_FILE_NUM = 5; + final int SMALL_FILE_NUM = 20; + + Configuration conf = UTIL.getConfiguration(); + // no cleaner policies = delete all files + conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, ""); + conf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, ORIGINAL_THROTTLE_POINT); + conf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE); + conf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE); + Server server = new DummyServer(); + Path archivedHfileDir = + new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY); + + // setup the cleaner + FileSystem fs = UTIL.getDFSCluster().getFileSystem(); + final HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir); + Assert.assertEquals(ORIGINAL_THROTTLE_POINT, cleaner.getThrottlePoint()); + Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getLargeQueueSize()); + Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getSmallQueueSize()); + + // clean up archive directory and create files for testing + fs.delete(archivedHfileDir, true); + fs.mkdirs(archivedHfileDir); + createFilesForTesting(LARGE_FILE_NUM, SMALL_FILE_NUM, fs, archivedHfileDir); + + // call cleaner, run as daemon to test the interrupt-at-middle case + Thread t = new Thread() { + @Override + public void run() { + cleaner.chore(); + } + }; + t.setDaemon(true); + t.start(); + // let the cleaner run for some while + Thread.sleep(20); + + // trigger configuration change + Configuration newConf = new Configuration(conf); + newConf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, UPDATE_THROTTLE_POINT); + newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE); + newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE); + cleaner.onConfigurationChange(newConf); + LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles() + + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles()); + + // check values after change + Assert.assertEquals(UPDATE_THROTTLE_POINT, cleaner.getThrottlePoint()); + Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getLargeQueueSize()); + Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getSmallQueueSize()); + Assert.assertEquals(2, cleaner.getCleanerThreads().size()); + + // wait until clean done and check + t.join(); + LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles() + + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles()); + Assert.assertTrue("Should delete more than " + LARGE_FILE_NUM + + " files from large queue but actually " + cleaner.getNumOfDeletedLargeFiles(), + cleaner.getNumOfDeletedLargeFiles() > LARGE_FILE_NUM); + Assert.assertTrue("Should delete less than " + SMALL_FILE_NUM + + " files from small queue but actually " + cleaner.getNumOfDeletedSmallFiles(), + cleaner.getNumOfDeletedSmallFiles() < SMALL_FILE_NUM); + } + + private void createFilesForTesting(int largeFileNum, int smallFileNum, FileSystem fs, + Path archivedHfileDir) throws IOException { + final Random rand = new Random(); + final byte[] large = new byte[1024 * 1024]; + for (int i = 0; i < large.length; i++) { + large[i] = (byte) rand.nextInt(128); + } + final byte[] small = new byte[1024]; + for (int i = 0; i < small.length; i++) { + small[i] = (byte) rand.nextInt(128); + } + // create large and small files + for (int i = 1; i <= largeFileNum; i++) { + FSDataOutputStream out = fs.create(new Path(archivedHfileDir, "large-file-" + i)); + out.write(large); + out.close(); + } + for (int i = 1; i <= smallFileNum; i++) { + FSDataOutputStream out = fs.create(new Path(archivedHfileDir, "small-file-" + i)); + out.write(small); + out.close(); + } + } }