Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 bd0466f15 -> 410b80984


Revert "HBASE-16101 Tool to microbenchmark procedure WAL performance."
"Missing some dependent functions in ProcedureTestingUtility which results in 
compile failure."

This reverts commit bd0466f1534a6230ebef17bf2587ba0bde3d6522.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/410b8098
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/410b8098
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/410b8098

Branch: refs/heads/branch-1.3
Commit: 410b8098482f7583ef2ab938c6a3d3b442d62357
Parents: bd0466f
Author: Apekshit Sharma <[email protected]>
Authored: Tue Aug 30 15:21:36 2016 -0700
Committer: Apekshit Sharma <[email protected]>
Committed: Tue Aug 30 15:21:36 2016 -0700

----------------------------------------------------------------------
 hbase-assembly/pom.xml                          |  11 -
 .../hadoop/hbase/util/AbstractHBaseTool.java    |  78 +++---
 ...ProcedureWALLoaderPerformanceEvaluation.java | 248 -----------------
 .../wal/ProcedureWALPerformanceEvaluation.java  | 267 -------------------
 4 files changed, 37 insertions(+), 567 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/410b8098/hbase-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-assembly/pom.xml b/hbase-assembly/pom.xml
index a651c75..eca3f12 100644
--- a/hbase-assembly/pom.xml
+++ b/hbase-assembly/pom.xml
@@ -165,17 +165,6 @@
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-thrift</artifactId>
     </dependency>
-    <!-- To dump tools in hbase-procedure into cached_classpath.txt. -->
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-procedure</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-procedure</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
     <dependency>
         <groupId>org.apache.hbase</groupId>
         <artifactId>hbase-hadoop-compat</artifactId>

http://git-wip-us.apache.org/repos/asf/hbase/blob/410b8098/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
index 6e3dec6..a876aef 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
@@ -17,11 +17,13 @@
 package org.apache.hadoop.hbase.util;
 
 import java.io.IOException;
+import java.util.Set;
+import java.util.TreeSet;
 
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
@@ -38,11 +40,12 @@ import org.apache.hadoop.util.ToolRunner;
  */
 @InterfaceAudience.Private
 public abstract class AbstractHBaseTool implements Tool {
+
   protected static final int EXIT_SUCCESS = 0;
   protected static final int EXIT_FAILURE = 1;
 
-  private static final Option HELP_OPTION = new Option("h", "help", false,
-      "Prints help for this tool.");
+  private static final String SHORT_HELP_OPTION = "h";
+  private static final String LONG_HELP_OPTION = "help";
 
   private static final Log LOG = LogFactory.getLog(AbstractHBaseTool.class);
 
@@ -50,6 +53,8 @@ public abstract class AbstractHBaseTool implements Tool {
 
   protected Configuration conf = null;
 
+  private static final Set<String> requiredOptions = new TreeSet<String>();
+
   protected String[] cmdLineArgs = null;
 
   /**
@@ -78,7 +83,6 @@ public abstract class AbstractHBaseTool implements Tool {
 
   @Override
   public final int run(String[] args) throws IOException {
-    cmdLineArgs = args;
     if (conf == null) {
       LOG.error("Tool configuration is not initialized");
       throw new NullPointerException("conf");
@@ -86,22 +90,24 @@ public abstract class AbstractHBaseTool implements Tool {
 
     CommandLine cmd;
     try {
-      addOptions();
-      if (isHelpCommand(args)) {
-        printUsage();
-        return EXIT_SUCCESS;
-      }
       // parse the command line arguments
-      cmd = new BasicParser().parse(options, args);
+      cmd = parseArgs(args);
+      cmdLineArgs = args;
     } catch (ParseException e) {
       LOG.error("Error when parsing command-line arguments", e);
       printUsage();
       return EXIT_FAILURE;
     }
 
+    if (cmd.hasOption(SHORT_HELP_OPTION) || cmd.hasOption(LONG_HELP_OPTION) ||
+        !sanityCheckOptions(cmd)) {
+      printUsage();
+      return EXIT_FAILURE;
+    }
+
     processOptions(cmd);
 
-    int ret;
+    int ret = EXIT_FAILURE;
     try {
       ret = doWork();
     } catch (Exception e) {
@@ -111,11 +117,22 @@ public abstract class AbstractHBaseTool implements Tool {
     return ret;
   }
 
-  private boolean isHelpCommand(String[] args) throws ParseException {
-    Options helpOption = new Options().addOption(HELP_OPTION);
-    // this parses the command line but doesn't throw an exception on unknown 
options
-    CommandLine cl = new BasicParser().parse(helpOption, args, true);
-    return cl.getOptions().length != 0;
+  private boolean sanityCheckOptions(CommandLine cmd) {
+    boolean success = true;
+    for (String reqOpt : requiredOptions) {
+      if (!cmd.hasOption(reqOpt)) {
+        LOG.error("Required option -" + reqOpt + " is missing");
+        success = false;
+      }
+    }
+    return success;
+  }
+
+  protected CommandLine parseArgs(String[] args) throws ParseException {
+    options.addOption(SHORT_HELP_OPTION, LONG_HELP_OPTION, false, "Show 
usage");
+    addOptions();
+    CommandLineParser parser = new BasicParser();
+    return parser.parse(options, args);
   }
 
   protected void printUsage() {
@@ -129,20 +146,14 @@ public abstract class AbstractHBaseTool implements Tool {
     helpFormatter.printHelp(usageStr, usageHeader, options, usageFooter);
   }
 
-  protected void addOption(Option option) {
-    options.addOption(option);
-  }
-
   protected void addRequiredOptWithArg(String opt, String description) {
-    Option option = new Option(opt, true, description);
-    option.setRequired(true);
-    options.addOption(option);
+    requiredOptions.add(opt);
+    addOptWithArg(opt, description);
   }
 
   protected void addRequiredOptWithArg(String shortOpt, String longOpt, String 
description) {
-    Option option = new Option(shortOpt, longOpt, true, description);
-    option.setRequired(true);
-    options.addOption(option);
+    requiredOptions.add(longOpt);
+    addOptWithArg(shortOpt, longOpt, description);
   }
 
   protected void addOptNoArg(String opt, String description) {
@@ -161,21 +172,6 @@ public abstract class AbstractHBaseTool implements Tool {
     options.addOption(shortOpt, longOpt, true, description);
   }
 
-  public int getOptionAsInt(CommandLine cmd, String opt, int defaultValue) {
-    if (cmd.hasOption(opt)) {
-      return Integer.parseInt(cmd.getOptionValue(opt));
-    } else {
-      return defaultValue;
-    }
-  }
-
-  public double getOptionAsDouble(CommandLine cmd, String opt, double 
defaultValue) {
-    if (cmd.hasOption(opt)) {
-      return Double.parseDouble(cmd.getOptionValue(opt));
-    } else {
-      return defaultValue;
-    }
-  }
   /**
    * Parse a number and enforce a range.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/410b8098/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java
deleted file mode 100644
index 347239d..0000000
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.procedure2.store.wal;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
-import org.apache.hadoop.hbase.ProcedureInfo;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
-import 
org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
-import 
org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
-import org.apache.hadoop.hbase.procedure2.util.StringUtils;
-import org.apache.hadoop.hbase.util.AbstractHBaseTool;
-
-import static java.lang.System.currentTimeMillis;
-
-public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool 
{
-  protected static final HBaseCommonTestingUtility UTIL = new 
HBaseCommonTestingUtility();
-
-  // Command line options and defaults.
-  public static int DEFAULT_NUM_PROCS = 1000000;  // 1M
-  public static Option NUM_PROCS_OPTION = new Option("procs", true,
-      "Total number of procedures. Default: " + DEFAULT_NUM_PROCS);
-  public static int DEFAULT_NUM_WALS = 0;
-  public static Option NUM_WALS_OPTION = new Option("wals", true,
-      "Number of WALs to write. If -ve or 0, uses " + 
WALProcedureStore.ROLL_THRESHOLD_CONF_KEY +
-          " conf to roll the logs. Default: " + DEFAULT_NUM_WALS);
-  public static int DEFAULT_STATE_SIZE = 1024;  // 1KB
-  public static Option STATE_SIZE_OPTION = new Option("size", true,
-      "Size of serialized state in bytes to write on update. Default: " + 
DEFAULT_STATE_SIZE
-          + " bytes");
-  public static int DEFAULT_UPDATES_PER_PROC = 5;
-  public static Option UPDATES_PER_PROC_OPTION = new 
Option("updates_per_proc", true,
-      "Number of update states to write for each proc. Default: " + 
DEFAULT_UPDATES_PER_PROC);
-  public static double DEFAULT_DELETE_PROCS_FRACTION = 0.50;
-  public static Option DELETE_PROCS_FRACTION_OPTION = new 
Option("delete_procs_fraction", true,
-      "Fraction of procs for which to write delete state. Distribution of 
procs chosen for "
-          + "delete is uniform across all procs. Default: " + 
DEFAULT_DELETE_PROCS_FRACTION);
-
-  public int numProcs;
-  public int updatesPerProc;
-  public double deleteProcsFraction;
-  public int numWals;
-  private WALProcedureStore store;
-  static byte[] serializedState;
-
-  private class LoadCounter implements ProcedureStore.ProcedureLoader {
-    public LoadCounter() {}
-
-    @Override
-    public void setMaxProcId(long maxProcId) {
-    }
-
-    @Override
-    public void load(ProcedureIterator procIter) throws IOException {
-      while (procIter.hasNext()) {
-        if (procIter.isNextCompleted()) {
-          ProcedureInfo proc = procIter.nextAsProcedureInfo();
-        } else {
-          Procedure proc = procIter.nextAsProcedure();
-        }
-      }
-    }
-
-    @Override
-    public void handleCorrupted(ProcedureIterator procIter) throws IOException 
{
-      while (procIter.hasNext()) {
-        Procedure proc = procIter.nextAsProcedure();
-      }
-    }
-  }
-
-  @Override
-  protected void addOptions() {
-    addOption(NUM_PROCS_OPTION);
-    addOption(UPDATES_PER_PROC_OPTION);
-    addOption(DELETE_PROCS_FRACTION_OPTION);
-    addOption(NUM_WALS_OPTION);
-    addOption(STATE_SIZE_OPTION);
-  }
-
-  @Override
-  protected void processOptions(CommandLine cmd) {
-    numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), 
DEFAULT_NUM_PROCS);
-    numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS);
-    int stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), 
DEFAULT_STATE_SIZE);
-    serializedState = new byte[stateSize];
-    updatesPerProc = getOptionAsInt(cmd, UPDATES_PER_PROC_OPTION.getOpt(),
-        DEFAULT_UPDATES_PER_PROC);
-    deleteProcsFraction = getOptionAsDouble(cmd, 
DELETE_PROCS_FRACTION_OPTION.getOpt(),
-        DEFAULT_DELETE_PROCS_FRACTION);
-    setupConf();
-  }
-
-  private void setupConf() {
-    if (numWals > 0) {
-      conf.setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, Long.MAX_VALUE);
-    }
-  }
-
-  public void setUpProcedureStore() throws IOException {
-    Path testDir = UTIL.getDataTestDir();
-    FileSystem fs = testDir.getFileSystem(conf);
-    Path logDir = new Path(testDir, "proc-logs");
-    System.out.println("\n\nLogs directory : " + logDir.toString() + "\n\n");
-    fs.delete(logDir, true);
-    store = ProcedureTestingUtility.createWalStore(conf, fs, logDir);
-    store.start(1);
-    store.recoverLease();
-    store.load(new LoadCounter());
-  }
-
-  /**
-   * @return a list of shuffled integers which represent state of proc id. 
First occurrence of a
-   * number denotes insert state, consecutive occurrences denote update 
states, and -ve value
-   * denotes delete state.
-   */
-  private List<Integer> shuffleProcWriteSequence() {
-    Random rand = new Random();
-    List<Integer> procStatesSequence = new ArrayList<>();
-    Set<Integer> toBeDeletedProcs = new HashSet<>();
-    // Add n + 1 entries of the proc id for insert + updates. If proc is 
chosen for delete, add
-    // extra entry which is marked -ve in the loop after shuffle.
-    for (int procId  = 1; procId <= numProcs; ++procId) {
-      procStatesSequence.addAll(Collections.nCopies(updatesPerProc + 1, 
procId));
-      if (rand.nextFloat() < deleteProcsFraction) {
-        procStatesSequence.add(procId);
-        toBeDeletedProcs.add(procId);
-      }
-    }
-    Collections.shuffle(procStatesSequence);
-    // Mark last occurrences of proc ids in toBeDeletedProcs with -ve to 
denote it's a delete state.
-    for (int i = procStatesSequence.size() - 1; i >= 0; --i) {
-      int procId = procStatesSequence.get(i);
-      if (toBeDeletedProcs.contains(procId)) {
-        procStatesSequence.set(i, -1 * procId);
-        toBeDeletedProcs.remove(procId);
-      }
-    }
-    return procStatesSequence;
-  }
-
-  private void writeWals() throws IOException {
-    List<Integer> procStates = shuffleProcWriteSequence();
-    TestProcedure[] procs = new TestProcedure[numProcs + 1];  // 0 is not used.
-    int numProcsPerWal = numWals > 0 ? (int)Math.ceil(procStates.size() / 
numWals)
-        : Integer.MAX_VALUE;
-    long startTime = currentTimeMillis();
-    long lastTime = startTime;
-    for (int i = 0; i < procStates.size(); ++i) {
-      int procId = procStates.get(i);
-      if (procId < 0) {
-        store.delete(procs[-procId].getProcId());
-        procs[-procId] = null;
-      } else if (procs[procId] == null) {
-        procs[procId] = new TestProcedure(procId, 0);
-        procs[procId].setData(serializedState);
-        store.insert(procs[procId], null);
-      } else {
-        store.update(procs[procId]);
-      }
-      if (i > 0 && i % numProcsPerWal == 0) {
-        long currentTime = currentTimeMillis();
-        System.out.println("Forcing wall roll. Time taken on last WAL: " +
-            (currentTime - lastTime) / 1000.0f + " sec");
-        store.rollWriterForTesting();
-        lastTime = currentTime;
-      }
-    }
-    long timeTaken = currentTimeMillis() - startTime;
-    System.out.println("\n\nDone writing WALs.\nNum procs : " + numProcs + 
"\nTotal time taken : "
-        + StringUtils.humanTimeDiff(timeTaken) + "\n\n");
-  }
-
-  private void storeRestart(ProcedureStore.ProcedureLoader loader) throws 
IOException {
-    System.out.println("Restarting procedure store to read back the WALs");
-    store.stop(false);
-    store.start(1);
-    store.recoverLease();
-
-    long startTime = currentTimeMillis();
-    store.load(loader);
-    long timeTaken = System.currentTimeMillis() - startTime;
-    System.out.println("******************************************");
-    System.out.println("Load time : " + (timeTaken / 1000.0f) + "sec");
-    System.out.println("******************************************");
-  }
-
-  public void tearDownProcedureStore() {
-    store.stop(false);
-    try {
-      store.getFileSystem().delete(store.getLogDir(), true);
-    } catch (IOException e) {
-      System.err.println("Error: Couldn't delete log dir. You can delete it 
manually to free up "
-          + "disk space. Location: " + store.getLogDir().toString());
-      System.err.println(e.toString());
-    }
-  }
-
-  @Override
-  protected int doWork() {
-    try {
-      setUpProcedureStore();
-      writeWals();
-      storeRestart(new LoadCounter());
-      return EXIT_SUCCESS;
-    } catch (IOException e) {
-      e.printStackTrace();
-      return EXIT_FAILURE;
-    } finally {
-      tearDownProcedureStore();
-    }
-  }
-
-  public static void main(String[] args) throws IOException {
-    ProcedureWALLoaderPerformanceEvaluation tool = new 
ProcedureWALLoaderPerformanceEvaluation();
-    tool.setConf(UTIL.getConfiguration());
-    tool.run(args);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/410b8098/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
deleted file mode 100644
index 210ac43..0000000
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.procedure2.store.wal;
-
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
-import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
-import org.apache.hadoop.hbase.procedure2.util.*;
-
-import org.apache.hadoop.hbase.util.AbstractHBaseTool;
-
-public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
-  protected static final HBaseCommonTestingUtility UTIL = new 
HBaseCommonTestingUtility();
-
-  // Command line options and defaults.
-  public static int DEFAULT_NUM_THREADS = 20;
-  public static Option NUM_THREADS_OPTION = new Option("threads", true,
-      "Number of parallel threads which will write insert/updates/deletes to 
WAL. Default: "
-      + DEFAULT_NUM_THREADS);
-  public static int DEFAULT_NUM_PROCS = 1000000;  // 1M
-  public static Option NUM_PROCS_OPTION = new Option("procs", true,
-      "Total number of procedures. Each procedure writes one insert and one 
update. Default: "
-      + DEFAULT_NUM_PROCS);
-  public static int DEFAULT_NUM_WALS = 0;
-  public static Option NUM_WALS_OPTION = new Option("wals", true,
-      "Number of WALs to write. If -ve or 0, uses " + 
WALProcedureStore.ROLL_THRESHOLD_CONF_KEY +
-          " conf to roll the logs. Default: " + DEFAULT_NUM_WALS);
-  public static int DEFAULT_STATE_SIZE = 1024;  // 1KB
-  public static Option STATE_SIZE_OPTION = new Option("size", true,
-      "Size of serialized state in bytes to write on update. Default: " + 
DEFAULT_STATE_SIZE
-          + "bytes");
-  public static Option SYNC_OPTION = new Option("sync", true,
-      "Type of sync to use when writing WAL contents to file system. Accepted 
values: hflush, "
-          + "hsync, nosync. Default: hflush");
-  public static String DEFAULT_SYNC_OPTION = "hflush";
-
-  public int numThreads;
-  public long numProcs;
-  public long numProcsPerWal = Long.MAX_VALUE;  // never roll wall based on 
this value.
-  public int numWals;
-  public String syncType;
-  public int stateSize;
-  static byte[] serializedState;
-  private WALProcedureStore store;
-
-  /** Used by {@link Worker}. */
-  private AtomicLong procIds = new AtomicLong(0);
-  private AtomicBoolean workersFailed = new AtomicBoolean(false);
-  // Timeout for worker threads.
-  private static final int WORKER_THREADS_TIMEOUT_SEC = 600;  // in seconds
-
-  // Non-default configurations.
-  private void setupConf() {
-    conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, 
"hsync".equals(syncType));
-    if (numWals > 0) {
-      conf.setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, Long.MAX_VALUE);
-      numProcsPerWal = numProcs / numWals;
-    }
-  }
-
-  private void setupProcedureStore() throws IOException {
-    Path testDir = UTIL.getDataTestDir();
-    FileSystem fs = testDir.getFileSystem(conf);
-    Path logDir = new Path(testDir, "proc-logs");
-    System.out.println("Logs directory : " + logDir.toString());
-    fs.delete(logDir, true);
-    if ("nosync".equals(syncType)) {
-      store = new NoSyncWalProcedureStore(conf, fs, logDir);
-    } else {
-      store = ProcedureTestingUtility.createWalStore(conf, fs, logDir);
-    }
-    store.start(numThreads);
-    store.recoverLease();
-    store.load(new ProcedureTestingUtility.LoadCounter());
-    System.out.println("Starting new log : "
-        + store.getActiveLogs().get(store.getActiveLogs().size() - 1));
-  }
-
-  private void tearDownProcedureStore() {
-    store.stop(false);
-    try {
-      store.getFileSystem().delete(store.getLogDir(), true);
-    } catch (IOException e) {
-      System.err.println("Error: Couldn't delete log dir. You can delete it 
manually to free up "
-          + "disk space. Location: " + store.getLogDir().toString());
-      e.printStackTrace();
-    }
-  }
-
-  /**
-   * Processes and validates command line options.
-   */
-  @Override
-  public void processOptions(CommandLine cmd) {
-    numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), 
DEFAULT_NUM_THREADS);
-    numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), 
DEFAULT_NUM_PROCS);
-    numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS);
-    syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION);
-    assert "hsync".equals(syncType) || "hflush".equals(syncType) || 
"nosync".equals(syncType):
-        "sync argument can only accept one of these three values: hsync, 
hflush, nosync";
-    stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), 
DEFAULT_STATE_SIZE);
-    serializedState = new byte[stateSize];
-    setupConf();
-  }
-
-  @Override
-  public void addOptions() {
-    addOption(NUM_THREADS_OPTION);
-    addOption(NUM_PROCS_OPTION);
-    addOption(NUM_WALS_OPTION);
-    addOption(SYNC_OPTION);
-    addOption(STATE_SIZE_OPTION);
-  }
-
-  @Override
-  public int doWork() {
-    try {
-      setupProcedureStore();
-      ExecutorService executor = Executors.newFixedThreadPool(numThreads);
-      Future<Integer>[] futures = (Future<Integer>[]) new Object[numThreads];
-      // Start worker threads.
-      long start = System.currentTimeMillis();
-      for (int i = 0; i < numThreads; i++) {
-        futures[i] = executor.submit(this.new Worker(start));
-      }
-      boolean failure = false;
-      try {
-        for (Future<Integer> future : futures) {
-          long timeout = start + WORKER_THREADS_TIMEOUT_SEC * 1000 - 
System.currentTimeMillis();
-          failure |= (future.get(timeout, 
TimeUnit.MILLISECONDS).equals(EXIT_FAILURE));
-        }
-      } catch (Exception e) {
-        System.err.println("Exception in worker thread.");
-        e.printStackTrace();
-        return EXIT_FAILURE;
-      }
-      executor.shutdown();
-      if (failure) {
-        return EXIT_FAILURE;
-      }
-      long timeTaken = System.currentTimeMillis() - start;
-      System.out.println("******************************************");
-      System.out.println("Num threads    : " + numThreads);
-      System.out.println("Num procedures : " + numProcs);
-      System.out.println("Sync type      : " + syncType);
-      System.out.println("Time taken     : " + (timeTaken / 1000.0f) + "sec");
-      System.out.println("******************************************");
-      return EXIT_SUCCESS;
-    } catch (IOException e) {
-      e.printStackTrace();
-      return EXIT_FAILURE;
-    } finally {
-      tearDownProcedureStore();
-    }
-  }
-
-  ///////////////////////////////
-  // HELPER CLASSES
-  ///////////////////////////////
-
-  /**
-   * Callable to generate load for wal by inserting/deleting/updating 
procedures.
-   * If procedure store fails to roll log file (throws IOException), all 
threads quit, and at
-   * least one returns value of {@link AbstractHBaseTool#EXIT_FAILURE}.
-   */
-  class Worker implements Callable<Integer> {
-    final long start;
-
-    public Worker(long start) {
-      this.start = start;
-    }
-
-    // TODO: Can also collect #procs, time taken by each thread to measure 
fairness.
-    @Override
-    public Integer call() throws IOException {
-      while (true) {
-        if (workersFailed.get()) {
-          return EXIT_FAILURE;
-        }
-        long procId = procIds.getAndIncrement();
-        if (procId >= numProcs) {
-          break;
-        }
-        if (procId != 0 && procId % 10000 == 0) {
-          long ms = System.currentTimeMillis() - start;
-          System.out.println("Wrote " + procId + " procedures in "
-              + StringUtils.humanTimeDiff(ms));
-        }
-        try{
-          if (procId > 0 && procId % numProcsPerWal == 0) {
-            store.rollWriterForTesting();
-            System.out.println("Starting new log : "
-                + store.getActiveLogs().get(store.getActiveLogs().size() - 1));
-          }
-        } catch (IOException ioe) {
-          // Ask other threads to quit too.
-          workersFailed.set(true);
-          System.err.println("Exception when rolling log file. Current procId 
= " + procId);
-          ioe.printStackTrace();
-          return EXIT_FAILURE;
-        }
-        ProcedureTestingUtility.TestProcedure proc =
-            new ProcedureTestingUtility.TestProcedure(procId);
-        proc.setData(serializedState);
-        store.insert(proc, null);
-        store.update(proc);
-      }
-      return EXIT_SUCCESS;
-    }
-  }
-
-  public class NoSyncWalProcedureStore extends WALProcedureStore {
-    public NoSyncWalProcedureStore(final Configuration conf, final FileSystem 
fs,
-        final Path logDir) {
-      super(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() {
-        @Override
-        public void recoverFileLease(FileSystem fs, Path path) throws 
IOException {
-          // no-op
-        }
-      });
-    }
-
-    @Override
-    protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int 
offset, int count)
-        throws IOException {
-      long totalSynced = 0;
-      for (int i = 0; i < count; ++i) {
-        totalSynced += slots[offset + i].size();
-      }
-      return totalSynced;
-    }
-  }
-
-  public static void main(String[] args) throws IOException {
-    ProcedureWALPerformanceEvaluation tool = new 
ProcedureWALPerformanceEvaluation();
-    tool.setConf(UTIL.getConfiguration());
-    tool.run(args);
-  }
-}
\ No newline at end of file

Reply via email to