Repository: hbase Updated Branches: refs/heads/branch-1.3 bd0466f15 -> 410b80984
Revert "HBASE-16101 Tool to microbenchmark procedure WAL performance." "Missing some dependent functions in ProcedureTestingUtility which results in compile failure." This reverts commit bd0466f1534a6230ebef17bf2587ba0bde3d6522. Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/410b8098 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/410b8098 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/410b8098 Branch: refs/heads/branch-1.3 Commit: 410b8098482f7583ef2ab938c6a3d3b442d62357 Parents: bd0466f Author: Apekshit Sharma <[email protected]> Authored: Tue Aug 30 15:21:36 2016 -0700 Committer: Apekshit Sharma <[email protected]> Committed: Tue Aug 30 15:21:36 2016 -0700 ---------------------------------------------------------------------- hbase-assembly/pom.xml | 11 - .../hadoop/hbase/util/AbstractHBaseTool.java | 78 +++--- ...ProcedureWALLoaderPerformanceEvaluation.java | 248 ----------------- .../wal/ProcedureWALPerformanceEvaluation.java | 267 ------------------- 4 files changed, 37 insertions(+), 567 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/410b8098/hbase-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-assembly/pom.xml b/hbase-assembly/pom.xml index a651c75..eca3f12 100644 --- a/hbase-assembly/pom.xml +++ b/hbase-assembly/pom.xml @@ -165,17 +165,6 @@ <groupId>org.apache.hbase</groupId> <artifactId>hbase-thrift</artifactId> </dependency> - <!-- To dump tools in hbase-procedure into cached_classpath.txt. --> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-procedure</artifactId> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-procedure</artifactId> - <type>test-jar</type> - <scope>test</scope> - </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop-compat</artifactId> http://git-wip-us.apache.org/repos/asf/hbase/blob/410b8098/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java index 6e3dec6..a876aef 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java @@ -17,11 +17,13 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; +import java.util.Set; +import java.util.TreeSet; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; @@ -38,11 +40,12 @@ import org.apache.hadoop.util.ToolRunner; */ @InterfaceAudience.Private public abstract class AbstractHBaseTool implements Tool { + protected static final int EXIT_SUCCESS = 0; protected static final int EXIT_FAILURE = 1; - private static final Option HELP_OPTION = new Option("h", "help", false, - "Prints help for this tool."); + private static final String SHORT_HELP_OPTION = "h"; + private static final String LONG_HELP_OPTION = "help"; private static final Log LOG = LogFactory.getLog(AbstractHBaseTool.class); @@ -50,6 +53,8 @@ public abstract class AbstractHBaseTool implements Tool { protected Configuration conf = null; + private static final Set<String> requiredOptions = new TreeSet<String>(); + protected String[] cmdLineArgs = null; /** @@ -78,7 +83,6 @@ public abstract class AbstractHBaseTool implements Tool { @Override public final int run(String[] args) throws IOException { - cmdLineArgs = args; if (conf == null) { LOG.error("Tool configuration is not initialized"); throw new NullPointerException("conf"); @@ -86,22 +90,24 @@ public abstract class AbstractHBaseTool implements Tool { CommandLine cmd; try { - addOptions(); - if (isHelpCommand(args)) { - printUsage(); - return EXIT_SUCCESS; - } // parse the command line arguments - cmd = new BasicParser().parse(options, args); + cmd = parseArgs(args); + cmdLineArgs = args; } catch (ParseException e) { LOG.error("Error when parsing command-line arguments", e); printUsage(); return EXIT_FAILURE; } + if (cmd.hasOption(SHORT_HELP_OPTION) || cmd.hasOption(LONG_HELP_OPTION) || + !sanityCheckOptions(cmd)) { + printUsage(); + return EXIT_FAILURE; + } + processOptions(cmd); - int ret; + int ret = EXIT_FAILURE; try { ret = doWork(); } catch (Exception e) { @@ -111,11 +117,22 @@ public abstract class AbstractHBaseTool implements Tool { return ret; } - private boolean isHelpCommand(String[] args) throws ParseException { - Options helpOption = new Options().addOption(HELP_OPTION); - // this parses the command line but doesn't throw an exception on unknown options - CommandLine cl = new BasicParser().parse(helpOption, args, true); - return cl.getOptions().length != 0; + private boolean sanityCheckOptions(CommandLine cmd) { + boolean success = true; + for (String reqOpt : requiredOptions) { + if (!cmd.hasOption(reqOpt)) { + LOG.error("Required option -" + reqOpt + " is missing"); + success = false; + } + } + return success; + } + + protected CommandLine parseArgs(String[] args) throws ParseException { + options.addOption(SHORT_HELP_OPTION, LONG_HELP_OPTION, false, "Show usage"); + addOptions(); + CommandLineParser parser = new BasicParser(); + return parser.parse(options, args); } protected void printUsage() { @@ -129,20 +146,14 @@ public abstract class AbstractHBaseTool implements Tool { helpFormatter.printHelp(usageStr, usageHeader, options, usageFooter); } - protected void addOption(Option option) { - options.addOption(option); - } - protected void addRequiredOptWithArg(String opt, String description) { - Option option = new Option(opt, true, description); - option.setRequired(true); - options.addOption(option); + requiredOptions.add(opt); + addOptWithArg(opt, description); } protected void addRequiredOptWithArg(String shortOpt, String longOpt, String description) { - Option option = new Option(shortOpt, longOpt, true, description); - option.setRequired(true); - options.addOption(option); + requiredOptions.add(longOpt); + addOptWithArg(shortOpt, longOpt, description); } protected void addOptNoArg(String opt, String description) { @@ -161,21 +172,6 @@ public abstract class AbstractHBaseTool implements Tool { options.addOption(shortOpt, longOpt, true, description); } - public int getOptionAsInt(CommandLine cmd, String opt, int defaultValue) { - if (cmd.hasOption(opt)) { - return Integer.parseInt(cmd.getOptionValue(opt)); - } else { - return defaultValue; - } - } - - public double getOptionAsDouble(CommandLine cmd, String opt, double defaultValue) { - if (cmd.hasOption(opt)) { - return Double.parseDouble(cmd.getOptionValue(opt)); - } else { - return defaultValue; - } - } /** * Parse a number and enforce a range. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/410b8098/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java deleted file mode 100644 index 347239d..0000000 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java +++ /dev/null @@ -1,248 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.procedure2.store.wal; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Random; -import java.util.Set; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseCommonTestingUtility; -import org.apache.hadoop.hbase.ProcedureInfo; -import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; -import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; -import org.apache.hadoop.hbase.procedure2.util.StringUtils; -import org.apache.hadoop.hbase.util.AbstractHBaseTool; - -import static java.lang.System.currentTimeMillis; - -public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool { - protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); - - // Command line options and defaults. - public static int DEFAULT_NUM_PROCS = 1000000; // 1M - public static Option NUM_PROCS_OPTION = new Option("procs", true, - "Total number of procedures. Default: " + DEFAULT_NUM_PROCS); - public static int DEFAULT_NUM_WALS = 0; - public static Option NUM_WALS_OPTION = new Option("wals", true, - "Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY + - " conf to roll the logs. Default: " + DEFAULT_NUM_WALS); - public static int DEFAULT_STATE_SIZE = 1024; // 1KB - public static Option STATE_SIZE_OPTION = new Option("size", true, - "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE - + " bytes"); - public static int DEFAULT_UPDATES_PER_PROC = 5; - public static Option UPDATES_PER_PROC_OPTION = new Option("updates_per_proc", true, - "Number of update states to write for each proc. Default: " + DEFAULT_UPDATES_PER_PROC); - public static double DEFAULT_DELETE_PROCS_FRACTION = 0.50; - public static Option DELETE_PROCS_FRACTION_OPTION = new Option("delete_procs_fraction", true, - "Fraction of procs for which to write delete state. Distribution of procs chosen for " - + "delete is uniform across all procs. Default: " + DEFAULT_DELETE_PROCS_FRACTION); - - public int numProcs; - public int updatesPerProc; - public double deleteProcsFraction; - public int numWals; - private WALProcedureStore store; - static byte[] serializedState; - - private class LoadCounter implements ProcedureStore.ProcedureLoader { - public LoadCounter() {} - - @Override - public void setMaxProcId(long maxProcId) { - } - - @Override - public void load(ProcedureIterator procIter) throws IOException { - while (procIter.hasNext()) { - if (procIter.isNextCompleted()) { - ProcedureInfo proc = procIter.nextAsProcedureInfo(); - } else { - Procedure proc = procIter.nextAsProcedure(); - } - } - } - - @Override - public void handleCorrupted(ProcedureIterator procIter) throws IOException { - while (procIter.hasNext()) { - Procedure proc = procIter.nextAsProcedure(); - } - } - } - - @Override - protected void addOptions() { - addOption(NUM_PROCS_OPTION); - addOption(UPDATES_PER_PROC_OPTION); - addOption(DELETE_PROCS_FRACTION_OPTION); - addOption(NUM_WALS_OPTION); - addOption(STATE_SIZE_OPTION); - } - - @Override - protected void processOptions(CommandLine cmd) { - numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS); - numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS); - int stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE); - serializedState = new byte[stateSize]; - updatesPerProc = getOptionAsInt(cmd, UPDATES_PER_PROC_OPTION.getOpt(), - DEFAULT_UPDATES_PER_PROC); - deleteProcsFraction = getOptionAsDouble(cmd, DELETE_PROCS_FRACTION_OPTION.getOpt(), - DEFAULT_DELETE_PROCS_FRACTION); - setupConf(); - } - - private void setupConf() { - if (numWals > 0) { - conf.setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, Long.MAX_VALUE); - } - } - - public void setUpProcedureStore() throws IOException { - Path testDir = UTIL.getDataTestDir(); - FileSystem fs = testDir.getFileSystem(conf); - Path logDir = new Path(testDir, "proc-logs"); - System.out.println("\n\nLogs directory : " + logDir.toString() + "\n\n"); - fs.delete(logDir, true); - store = ProcedureTestingUtility.createWalStore(conf, fs, logDir); - store.start(1); - store.recoverLease(); - store.load(new LoadCounter()); - } - - /** - * @return a list of shuffled integers which represent state of proc id. First occurrence of a - * number denotes insert state, consecutive occurrences denote update states, and -ve value - * denotes delete state. - */ - private List<Integer> shuffleProcWriteSequence() { - Random rand = new Random(); - List<Integer> procStatesSequence = new ArrayList<>(); - Set<Integer> toBeDeletedProcs = new HashSet<>(); - // Add n + 1 entries of the proc id for insert + updates. If proc is chosen for delete, add - // extra entry which is marked -ve in the loop after shuffle. - for (int procId = 1; procId <= numProcs; ++procId) { - procStatesSequence.addAll(Collections.nCopies(updatesPerProc + 1, procId)); - if (rand.nextFloat() < deleteProcsFraction) { - procStatesSequence.add(procId); - toBeDeletedProcs.add(procId); - } - } - Collections.shuffle(procStatesSequence); - // Mark last occurrences of proc ids in toBeDeletedProcs with -ve to denote it's a delete state. - for (int i = procStatesSequence.size() - 1; i >= 0; --i) { - int procId = procStatesSequence.get(i); - if (toBeDeletedProcs.contains(procId)) { - procStatesSequence.set(i, -1 * procId); - toBeDeletedProcs.remove(procId); - } - } - return procStatesSequence; - } - - private void writeWals() throws IOException { - List<Integer> procStates = shuffleProcWriteSequence(); - TestProcedure[] procs = new TestProcedure[numProcs + 1]; // 0 is not used. - int numProcsPerWal = numWals > 0 ? (int)Math.ceil(procStates.size() / numWals) - : Integer.MAX_VALUE; - long startTime = currentTimeMillis(); - long lastTime = startTime; - for (int i = 0; i < procStates.size(); ++i) { - int procId = procStates.get(i); - if (procId < 0) { - store.delete(procs[-procId].getProcId()); - procs[-procId] = null; - } else if (procs[procId] == null) { - procs[procId] = new TestProcedure(procId, 0); - procs[procId].setData(serializedState); - store.insert(procs[procId], null); - } else { - store.update(procs[procId]); - } - if (i > 0 && i % numProcsPerWal == 0) { - long currentTime = currentTimeMillis(); - System.out.println("Forcing wall roll. Time taken on last WAL: " + - (currentTime - lastTime) / 1000.0f + " sec"); - store.rollWriterForTesting(); - lastTime = currentTime; - } - } - long timeTaken = currentTimeMillis() - startTime; - System.out.println("\n\nDone writing WALs.\nNum procs : " + numProcs + "\nTotal time taken : " - + StringUtils.humanTimeDiff(timeTaken) + "\n\n"); - } - - private void storeRestart(ProcedureStore.ProcedureLoader loader) throws IOException { - System.out.println("Restarting procedure store to read back the WALs"); - store.stop(false); - store.start(1); - store.recoverLease(); - - long startTime = currentTimeMillis(); - store.load(loader); - long timeTaken = System.currentTimeMillis() - startTime; - System.out.println("******************************************"); - System.out.println("Load time : " + (timeTaken / 1000.0f) + "sec"); - System.out.println("******************************************"); - } - - public void tearDownProcedureStore() { - store.stop(false); - try { - store.getFileSystem().delete(store.getLogDir(), true); - } catch (IOException e) { - System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up " - + "disk space. Location: " + store.getLogDir().toString()); - System.err.println(e.toString()); - } - } - - @Override - protected int doWork() { - try { - setUpProcedureStore(); - writeWals(); - storeRestart(new LoadCounter()); - return EXIT_SUCCESS; - } catch (IOException e) { - e.printStackTrace(); - return EXIT_FAILURE; - } finally { - tearDownProcedureStore(); - } - } - - public static void main(String[] args) throws IOException { - ProcedureWALLoaderPerformanceEvaluation tool = new ProcedureWALLoaderPerformanceEvaluation(); - tool.setConf(UTIL.getConfiguration()); - tool.run(args); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/410b8098/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java deleted file mode 100644 index 210ac43..0000000 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java +++ /dev/null @@ -1,267 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.procedure2.store.wal; - -import java.io.IOException; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.hbase.HBaseCommonTestingUtility; -import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; -import org.apache.hadoop.hbase.procedure2.util.*; - -import org.apache.hadoop.hbase.util.AbstractHBaseTool; - -public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool { - protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); - - // Command line options and defaults. - public static int DEFAULT_NUM_THREADS = 20; - public static Option NUM_THREADS_OPTION = new Option("threads", true, - "Number of parallel threads which will write insert/updates/deletes to WAL. Default: " - + DEFAULT_NUM_THREADS); - public static int DEFAULT_NUM_PROCS = 1000000; // 1M - public static Option NUM_PROCS_OPTION = new Option("procs", true, - "Total number of procedures. Each procedure writes one insert and one update. Default: " - + DEFAULT_NUM_PROCS); - public static int DEFAULT_NUM_WALS = 0; - public static Option NUM_WALS_OPTION = new Option("wals", true, - "Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY + - " conf to roll the logs. Default: " + DEFAULT_NUM_WALS); - public static int DEFAULT_STATE_SIZE = 1024; // 1KB - public static Option STATE_SIZE_OPTION = new Option("size", true, - "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE - + "bytes"); - public static Option SYNC_OPTION = new Option("sync", true, - "Type of sync to use when writing WAL contents to file system. Accepted values: hflush, " - + "hsync, nosync. Default: hflush"); - public static String DEFAULT_SYNC_OPTION = "hflush"; - - public int numThreads; - public long numProcs; - public long numProcsPerWal = Long.MAX_VALUE; // never roll wall based on this value. - public int numWals; - public String syncType; - public int stateSize; - static byte[] serializedState; - private WALProcedureStore store; - - /** Used by {@link Worker}. */ - private AtomicLong procIds = new AtomicLong(0); - private AtomicBoolean workersFailed = new AtomicBoolean(false); - // Timeout for worker threads. - private static final int WORKER_THREADS_TIMEOUT_SEC = 600; // in seconds - - // Non-default configurations. - private void setupConf() { - conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, "hsync".equals(syncType)); - if (numWals > 0) { - conf.setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, Long.MAX_VALUE); - numProcsPerWal = numProcs / numWals; - } - } - - private void setupProcedureStore() throws IOException { - Path testDir = UTIL.getDataTestDir(); - FileSystem fs = testDir.getFileSystem(conf); - Path logDir = new Path(testDir, "proc-logs"); - System.out.println("Logs directory : " + logDir.toString()); - fs.delete(logDir, true); - if ("nosync".equals(syncType)) { - store = new NoSyncWalProcedureStore(conf, fs, logDir); - } else { - store = ProcedureTestingUtility.createWalStore(conf, fs, logDir); - } - store.start(numThreads); - store.recoverLease(); - store.load(new ProcedureTestingUtility.LoadCounter()); - System.out.println("Starting new log : " - + store.getActiveLogs().get(store.getActiveLogs().size() - 1)); - } - - private void tearDownProcedureStore() { - store.stop(false); - try { - store.getFileSystem().delete(store.getLogDir(), true); - } catch (IOException e) { - System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up " - + "disk space. Location: " + store.getLogDir().toString()); - e.printStackTrace(); - } - } - - /** - * Processes and validates command line options. - */ - @Override - public void processOptions(CommandLine cmd) { - numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS); - numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS); - numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS); - syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION); - assert "hsync".equals(syncType) || "hflush".equals(syncType) || "nosync".equals(syncType): - "sync argument can only accept one of these three values: hsync, hflush, nosync"; - stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE); - serializedState = new byte[stateSize]; - setupConf(); - } - - @Override - public void addOptions() { - addOption(NUM_THREADS_OPTION); - addOption(NUM_PROCS_OPTION); - addOption(NUM_WALS_OPTION); - addOption(SYNC_OPTION); - addOption(STATE_SIZE_OPTION); - } - - @Override - public int doWork() { - try { - setupProcedureStore(); - ExecutorService executor = Executors.newFixedThreadPool(numThreads); - Future<Integer>[] futures = (Future<Integer>[]) new Object[numThreads]; - // Start worker threads. - long start = System.currentTimeMillis(); - for (int i = 0; i < numThreads; i++) { - futures[i] = executor.submit(this.new Worker(start)); - } - boolean failure = false; - try { - for (Future<Integer> future : futures) { - long timeout = start + WORKER_THREADS_TIMEOUT_SEC * 1000 - System.currentTimeMillis(); - failure |= (future.get(timeout, TimeUnit.MILLISECONDS).equals(EXIT_FAILURE)); - } - } catch (Exception e) { - System.err.println("Exception in worker thread."); - e.printStackTrace(); - return EXIT_FAILURE; - } - executor.shutdown(); - if (failure) { - return EXIT_FAILURE; - } - long timeTaken = System.currentTimeMillis() - start; - System.out.println("******************************************"); - System.out.println("Num threads : " + numThreads); - System.out.println("Num procedures : " + numProcs); - System.out.println("Sync type : " + syncType); - System.out.println("Time taken : " + (timeTaken / 1000.0f) + "sec"); - System.out.println("******************************************"); - return EXIT_SUCCESS; - } catch (IOException e) { - e.printStackTrace(); - return EXIT_FAILURE; - } finally { - tearDownProcedureStore(); - } - } - - /////////////////////////////// - // HELPER CLASSES - /////////////////////////////// - - /** - * Callable to generate load for wal by inserting/deleting/updating procedures. - * If procedure store fails to roll log file (throws IOException), all threads quit, and at - * least one returns value of {@link AbstractHBaseTool#EXIT_FAILURE}. - */ - class Worker implements Callable<Integer> { - final long start; - - public Worker(long start) { - this.start = start; - } - - // TODO: Can also collect #procs, time taken by each thread to measure fairness. - @Override - public Integer call() throws IOException { - while (true) { - if (workersFailed.get()) { - return EXIT_FAILURE; - } - long procId = procIds.getAndIncrement(); - if (procId >= numProcs) { - break; - } - if (procId != 0 && procId % 10000 == 0) { - long ms = System.currentTimeMillis() - start; - System.out.println("Wrote " + procId + " procedures in " - + StringUtils.humanTimeDiff(ms)); - } - try{ - if (procId > 0 && procId % numProcsPerWal == 0) { - store.rollWriterForTesting(); - System.out.println("Starting new log : " - + store.getActiveLogs().get(store.getActiveLogs().size() - 1)); - } - } catch (IOException ioe) { - // Ask other threads to quit too. - workersFailed.set(true); - System.err.println("Exception when rolling log file. Current procId = " + procId); - ioe.printStackTrace(); - return EXIT_FAILURE; - } - ProcedureTestingUtility.TestProcedure proc = - new ProcedureTestingUtility.TestProcedure(procId); - proc.setData(serializedState); - store.insert(proc, null); - store.update(proc); - } - return EXIT_SUCCESS; - } - } - - public class NoSyncWalProcedureStore extends WALProcedureStore { - public NoSyncWalProcedureStore(final Configuration conf, final FileSystem fs, - final Path logDir) { - super(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() { - @Override - public void recoverFileLease(FileSystem fs, Path path) throws IOException { - // no-op - } - }); - } - - @Override - protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count) - throws IOException { - long totalSynced = 0; - for (int i = 0; i < count; ++i) { - totalSynced += slots[offset + i].size(); - } - return totalSynced; - } - } - - public static void main(String[] args) throws IOException { - ProcedureWALPerformanceEvaluation tool = new ProcedureWALPerformanceEvaluation(); - tool.setConf(UTIL.getConfiguration()); - tool.run(args); - } -} \ No newline at end of file
