Repository: hbase
Updated Branches:
  refs/heads/0.98 036c684e1 -> e8d8ca74e


HBASE-13712 Backport HBASE-13199 to branch-1

0.98 backport. Includes:
HBASE-13199 Some small improvements on canary tool (Shaohui Liu)
HBASE-13199 ADDENDUM Some small improvements on canary tool (Shaohui Liu)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e8d8ca74
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e8d8ca74
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e8d8ca74

Branch: refs/heads/0.98
Commit: e8d8ca74e553479efdd75f81f705c88a7ec66947
Parents: 036c684
Author: Andrew Purtell <apurt...@apache.org>
Authored: Fri May 22 12:13:27 2015 -0700
Committer: Andrew Purtell <apurt...@apache.org>
Committed: Fri May 22 12:13:27 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/tool/Canary.java    | 496 +++++++++++++------
 1 file changed, 339 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e8d8ca74/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
index 8107027..f55bc3c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
@@ -19,14 +19,23 @@
 
 package org.apache.hadoop.hbase.tool;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -46,10 +55,14 @@ import org.apache.hadoop.hbase.TableNotEnabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -113,6 +126,171 @@ public final class Canary implements Tool {
     }
   }
 
+  /**
+   * For each column family of the region tries to get one row and outputs the 
latency, or the
+   * failure.
+   */
+  static class RegionTask implements Callable<Void> {
+    private HConnection connection;
+    private HRegionInfo region;
+    private Sink sink;
+
+    RegionTask(HConnection connection, HRegionInfo region, Sink sink) {
+      this.connection = connection;
+      this.region = region;
+      this.sink = sink;
+    }
+
+    @Override
+    public Void call() {
+      HTableInterface table = null;
+      HTableDescriptor tableDesc = null;
+      try {
+        table = connection.getTable(region.getTable());
+        tableDesc = table.getTableDescriptor();
+      } catch (IOException e) {
+        LOG.debug("sniffRegion failed", e);
+        sink.publishReadFailure(region, e);
+        if (table != null) {
+          try {
+            table.close();
+          } catch (IOException ioe) {
+          }
+        }
+        return null;
+      }
+
+      byte[] startKey = null;
+      Get get = null;
+      Scan scan = null;
+      ResultScanner rs = null;
+      StopWatch stopWatch = new StopWatch();
+      for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
+        stopWatch.reset();
+        startKey = region.getStartKey();
+        // Can't do a get on empty start row so do a Scan of first element if 
any instead.
+        if (startKey.length > 0) {
+          get = new Get(startKey);
+          get.setCacheBlocks(false);
+          get.setFilter(new FirstKeyOnlyFilter());
+          get.addFamily(column.getName());
+        } else {
+          scan = new Scan();
+          scan.setCaching(1);
+          scan.setCacheBlocks(false);
+          scan.setFilter(new FirstKeyOnlyFilter());
+          scan.addFamily(column.getName());
+          scan.setMaxResultSize(1L);
+        }
+
+        try {
+          if (startKey.length > 0) {
+            stopWatch.start();
+            table.get(get);
+            stopWatch.stop();
+            sink.publishReadTiming(region, column, stopWatch.getTime());
+          } else {
+            stopWatch.start();
+            rs = table.getScanner(scan);
+            stopWatch.stop();
+            sink.publishReadTiming(region, column, stopWatch.getTime());
+          }
+        } catch (Exception e) {
+          sink.publishReadFailure(region, column, e);
+        } finally {
+          if (rs != null) {
+            rs.close();
+          }
+          scan = null;
+          get = null;
+          startKey = null;
+        }
+      }
+      try {
+        table.close();
+      } catch (IOException e) {
+      }
+      return null;
+    }
+  }
+
+  /**
+   * Get one row from a region on the regionserver and outputs the latency, or 
the failure.
+   */
+  static class RegionServerTask implements Callable<Void> {
+    private HConnection connection;
+    private String serverName;
+    private HRegionInfo region;
+    private ExtendedSink sink;
+
+    RegionServerTask(HConnection connection, String serverName, HRegionInfo 
region,
+        ExtendedSink sink) {
+      this.connection = connection;
+      this.serverName = serverName;
+      this.region = region;
+      this.sink = sink;
+    }
+
+    @Override
+    public Void call() {
+      TableName tableName = null;
+      HTableInterface table = null;
+      Get get = null;
+      byte[] startKey = null;
+      Scan scan = null;
+      StopWatch stopWatch = new StopWatch();
+      // monitor one region on every region server
+      stopWatch.reset();
+      try {
+        tableName = region.getTable();
+        table = connection.getTable(tableName);
+        startKey = region.getStartKey();
+        // Can't do a get on empty start row so do a Scan of first element if 
any instead.
+        if (startKey.length > 0) {
+          get = new Get(startKey);
+          get.setCacheBlocks(false);
+          get.setFilter(new FirstKeyOnlyFilter());
+          stopWatch.start();
+          table.get(get);
+          stopWatch.stop();
+        } else {
+          scan = new Scan();
+          scan.setCacheBlocks(false);
+          scan.setFilter(new FirstKeyOnlyFilter());
+          scan.setCaching(1);
+          scan.setMaxResultSize(1L);
+          stopWatch.start();
+          ResultScanner s = table.getScanner(scan);
+          s.close();
+          stopWatch.stop();
+        }
+        sink.publishReadTiming(tableName.getNameAsString(), serverName, 
stopWatch.getTime());
+      } catch (TableNotFoundException tnfe) {
+        // This is ignored because it doesn't imply that the regionserver is 
dead
+      } catch (TableNotEnabledException tnee) {
+        // This is considered a success since we got a response.
+        LOG.debug("The targeted table was disabled.  Assuming success.");
+      } catch (DoNotRetryIOException dnrioe) {
+        sink.publishReadFailure(tableName.getNameAsString(), serverName);
+        LOG.error(dnrioe);
+      } catch (IOException e) {
+        sink.publishReadFailure(tableName.getNameAsString(), serverName);
+        LOG.error(e);
+      } finally {
+        if (table != null) {
+          try {
+            table.close();
+          } catch (IOException e) {/* DO NOTHING */
+          }
+        }
+        scan = null;
+        get = null;
+        startKey = null;
+      }
+      return null;
+    }
+  }
+
   private static final int USAGE_EXIT_CODE = 1;
   private static final int INIT_ERROR_EXIT_CODE = 2;
   private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
@@ -122,6 +300,8 @@ public final class Canary implements Tool {
 
   private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
 
+  private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
+
   private static final Log LOG = LogFactory.getLog(Canary.class);
 
   private Configuration conf = null;
@@ -132,12 +312,14 @@ public final class Canary implements Tool {
   private long timeout = DEFAULT_TIMEOUT;
   private boolean failOnError = true;
   private boolean regionServerMode = false;
+  private ExecutorService executor; // threads to retrieve data from 
regionservers
 
   public Canary() {
-    this(new RegionServerStdOutSink());
+    this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink());
   }
 
-  public Canary(Sink sink) {
+  public Canary(ExecutorService executor, Sink sink) {
+    this.executor = executor;
     this.sink = sink;
   }
 
@@ -227,54 +409,65 @@ public final class Canary implements Tool {
       }
     }
 
-    // launches chore for refreshing kerberos ticket if security is enabled.
+    // Launches chore for refreshing kerberos credentials if security is 
enabled.
+    // Please see 
http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
+    // for more details.
     AuthUtil.launchAuthChore(conf);
 
-    // start to prepare the stuffs
+    // Start to prepare the stuffs
     Monitor monitor = null;
     Thread monitorThread = null;
     long startTime = 0;
     long currentTimeLength = 0;
+    // Get a connection to use in below.
+    HConnection connection = HConnectionManager.createConnection(this.conf);
+    try {
+      do {
+        // Do monitor !!
+        try {
+          monitor = this.newMonitor(connection, index, args);
+          monitorThread = new Thread(monitor);
+          startTime = System.currentTimeMillis();
+          monitorThread.start();
+          while (!monitor.isDone()) {
+            // wait for 1 sec
+            Thread.sleep(1000);
+            // exit if any error occurs
+            if (this.failOnError && monitor.hasError()) {
+              monitorThread.interrupt();
+              if (monitor.initialized) {
+                System.exit(monitor.errorCode);
+              } else {
+                System.exit(INIT_ERROR_EXIT_CODE);
+              }
+            }
+            currentTimeLength = System.currentTimeMillis() - startTime;
+            if (currentTimeLength > this.timeout) {
+              LOG.error("The monitor is running too long (" + currentTimeLength
+                  + ") after timeout limit:" + this.timeout
+                  + " will be killed itself !!");
+              if (monitor.initialized) {
+                System.exit(TIMEOUT_ERROR_EXIT_CODE);
+              } else {
+                System.exit(INIT_ERROR_EXIT_CODE);
+              }
+              break;
+            }
+          }
 
-    do {
-      // do monitor !!
-      monitor = this.newMonitor(index, args);
-      monitorThread = new Thread(monitor);
-      startTime = System.currentTimeMillis();
-      monitorThread.start();
-      while (!monitor.isDone()) {
-        // wait for 1 sec
-        Thread.sleep(1000);
-        // exit if any error occurs
-        if (this.failOnError && monitor.hasError()) {
-          monitorThread.interrupt();
-          if (monitor.initialized) {
+          if (this.failOnError && monitor.hasError()) {
+            monitorThread.interrupt();
             System.exit(monitor.errorCode);
-          } else {
-            System.exit(INIT_ERROR_EXIT_CODE);
-          }
-        }
-        currentTimeLength = System.currentTimeMillis() - startTime;
-        if (currentTimeLength > this.timeout) {
-          LOG.error("The monitor is running too long (" + currentTimeLength
-              + ") after timeout limit:" + this.timeout
-              + " will be killed itself !!");
-          if (monitor.initialized) {
-            System.exit(TIMEOUT_ERROR_EXIT_CODE);
-          } else {
-            System.exit(INIT_ERROR_EXIT_CODE);
           }
-          break;
+        } finally {
+          if (monitor != null) monitor.close();
         }
-      }
-
-      if (this.failOnError && monitor.hasError()) {
-        monitorThread.interrupt();
-        System.exit(monitor.errorCode);
-      }
 
-      Thread.sleep(interval);
-    } while (interval > 0);
+        Thread.sleep(interval);
+      } while (interval > 0);
+    } finally {
+      connection.close();
+    }
 
     return(monitor.errorCode);
   }
@@ -298,13 +491,13 @@ public final class Canary implements Tool {
   }
 
   /**
-   * a Factory method for {@link Monitor}.
-   * Can be overrided by user.
+   * A Factory method for {@link Monitor}.
+   * Can be overridden by user.
    * @param index a start index for monitor target
    * @param args args passed from user
    * @return a Monitor instance
    */
-  public Monitor newMonitor(int index, String[] args) {
+  public Monitor newMonitor(final HConnection connection, int index, String[] 
args) {
     Monitor monitor = null;
     String[] monitorTargets = null;
 
@@ -314,22 +507,21 @@ public final class Canary implements Tool {
       System.arraycopy(args, index, monitorTargets, 0, length);
     }
 
-    if(this.regionServerMode) {
-      monitor = new RegionServerMonitor(
-          this.conf,
-          monitorTargets,
-          this.useRegExp,
-          (ExtendedSink)this.sink);
+    if (this.regionServerMode) {
+      monitor =
+          new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
+              (ExtendedSink) this.sink, this.executor);
     } else {
-      monitor = new RegionMonitor(this.conf, monitorTargets, this.useRegExp, 
this.sink);
+      monitor =
+          new RegionMonitor(connection, monitorTargets, this.useRegExp, 
this.sink, this.executor);
     }
     return monitor;
   }
 
   // a Monitor super-class can be extended by users
-  public static abstract class Monitor implements Runnable {
+  public static abstract class Monitor implements Runnable, Closeable {
 
-    protected Configuration config;
+    protected HConnection connection;
     protected HBaseAdmin admin;
     protected String[] targets;
     protected boolean useRegExp;
@@ -338,6 +530,7 @@ public final class Canary implements Tool {
     protected boolean done = false;
     protected int errorCode = 0;
     protected Sink sink;
+    protected ExecutorService executor;
 
     public boolean isDone() {
       return done;
@@ -347,15 +540,20 @@ public final class Canary implements Tool {
       return errorCode != 0;
     }
 
-    protected Monitor(Configuration config, String[] monitorTargets,
-        boolean useRegExp, Sink sink) {
-      if (null == config)
-        throw new IllegalArgumentException("config shall not be null");
+    @Override
+    public void close() throws IOException {
+      if (this.admin != null) this.admin.close();
+    }
+
+    protected Monitor(HConnection connection, String[] monitorTargets, boolean 
useRegExp, Sink sink,
+        ExecutorService executor) {
+      if (null == connection) throw new IllegalArgumentException("connection 
shall not be null");
 
-      this.config = config;
+      this.connection = connection;
       this.targets = monitorTargets;
       this.useRegExp = useRegExp;
       this.sink = sink;
+      this.executor = executor;
     }
 
     public abstract void run();
@@ -363,7 +561,7 @@ public final class Canary implements Tool {
     protected boolean initAdmin() {
       if (null == this.admin) {
         try {
-          this.admin = new HBaseAdmin(config);
+          this.admin = new HBaseAdmin(connection);
         } catch (Exception e) {
           LOG.error("Initial HBaseAdmin failed...", e);
           this.errorCode = INIT_ERROR_EXIT_CODE;
@@ -379,23 +577,31 @@ public final class Canary implements Tool {
   // a monitor for region mode
   private static class RegionMonitor extends Monitor {
 
-    public RegionMonitor(Configuration config, String[] monitorTargets,
-        boolean useRegExp, Sink sink) {
-      super(config, monitorTargets, useRegExp, sink);
+    public RegionMonitor(HConnection connection, String[] monitorTargets, 
boolean useRegExp,
+        Sink sink, ExecutorService executor) {
+      super(connection, monitorTargets, useRegExp, sink, executor);
     }
 
     @Override
     public void run() {
-      if(this.initAdmin()) {
+      if (this.initAdmin()) {
         try {
+          List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
           if (this.targets != null && this.targets.length > 0) {
             String[] tables = generateMonitorTables(this.targets);
             this.initialized = true;
             for (String table : tables) {
-              Canary.sniff(admin, sink, table);
+              taskFutures.addAll(Canary.sniff(admin, sink, table, executor));
             }
           } else {
-            sniff();
+            taskFutures.addAll(sniff());
+          }
+          for (Future<Void> future : taskFutures) {
+            try {
+              future.get();
+            } catch (ExecutionException e) {
+              LOG.error("Sniff region failed!", e);
+            }
           }
         } catch (Exception e) {
           LOG.error("Run regionMonitor failed", e);
@@ -408,7 +614,7 @@ public final class Canary implements Tool {
     private String[] generateMonitorTables(String[] monitorTargets) throws 
IOException {
       String[] returnTables = null;
 
-      if(this.useRegExp) {
+      if (this.useRegExp) {
         Pattern pattern = null;
         HTableDescriptor[] tds = null;
         Set<String> tmpTables = new TreeSet<String>();
@@ -422,16 +628,15 @@ public final class Canary implements Tool {
               }
             }
           }
-        } catch(IOException e) {
+        } catch (IOException e) {
           LOG.error("Communicate with admin failed", e);
           throw e;
         }
 
-        if(tmpTables.size() > 0) {
+        if (tmpTables.size() > 0) {
           returnTables = tmpTables.toArray(new String[tmpTables.size()]);
         } else {
-          String msg = "No HTable found, tablePattern:"
-              + Arrays.toString(monitorTargets);
+          String msg = "No HTable found, tablePattern:" + 
Arrays.toString(monitorTargets);
           LOG.error(msg);
           this.errorCode = INIT_ERROR_EXIT_CODE;
           throw new TableNotFoundException(msg);
@@ -446,12 +651,15 @@ public final class Canary implements Tool {
     /*
      * canary entry point to monitor all the tables.
      */
-    private void sniff() throws Exception {
+    private List<Future<Void>> sniff() throws Exception {
+      List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
       for (HTableDescriptor table : admin.listTables()) {
-        Canary.sniff(admin, sink, table);
+        if (admin.isTableEnabled(table.getTableName())) {
+          taskFutures.addAll(Canary.sniff(admin, sink, table, executor));
+        }
       }
+      return taskFutures;
     }
-
   }
 
   /**
@@ -459,47 +667,49 @@ public final class Canary implements Tool {
    * @throws Exception
    */
   public static void sniff(final HBaseAdmin admin, TableName tableName) throws 
Exception {
-    sniff(admin, new StdOutSink(), tableName.getNameAsString());
+    List<Future<Void>> taskFutures =
+        Canary.sniff(admin, new StdOutSink(), tableName.getNameAsString(),
+          new ScheduledThreadPoolExecutor(1));
+    for (Future<Void> future : taskFutures) {
+      future.get();
+    }
   }
 
   /**
    * Canary entry point for specified table.
    * @throws Exception
    */
-  private static void sniff(final HBaseAdmin admin, final Sink sink, String 
tableName)
-      throws Exception {
-    if (admin.isTableAvailable(tableName)) {
-      sniff(admin, sink, admin.getTableDescriptor(tableName.getBytes()));
+  private static List<Future<Void>> sniff(final HBaseAdmin admin, final Sink 
sink, String tableName,
+      ExecutorService executor) throws Exception {
+    if (admin.isTableEnabled(TableName.valueOf(tableName))) {
+      return Canary.sniff(admin, sink, 
admin.getTableDescriptor(TableName.valueOf(tableName)),
+        executor);
     } else {
-      LOG.warn(String.format("Table %s is not available", tableName));
+      LOG.warn(String.format("Table %s is not enabled", tableName));
     }
+    return new LinkedList<Future<Void>>();
   }
 
   /*
    * Loops over regions that owns this table, and output some information 
abouts the state.
    */
-  private static void sniff(final HBaseAdmin admin, final Sink sink, 
HTableDescriptor tableDesc)
-      throws Exception {
-    HTable table = null;
-
+  private static List<Future<Void>> sniff(final HBaseAdmin admin, final Sink 
sink,
+      HTableDescriptor tableDesc, ExecutorService executor) throws Exception {
+    HTableInterface table = null;
     try {
-      table = new HTable(admin.getConfiguration(), tableDesc.getName());
+      table = admin.getConnection().getTable(tableDesc.getTableName());
     } catch (TableNotFoundException e) {
-      return;
+      return new ArrayList<Future<Void>>();
     }
-
+    List<RegionTask> tasks = new ArrayList<RegionTask>();
     try {
-      for (HRegionInfo region : admin.getTableRegions(tableDesc.getName())) {
-        try {
-          sniffRegion(admin, sink, region, table);
-        } catch (Exception e) {
-          sink.publishReadFailure(region, e);
-          LOG.debug("sniffRegion failed", e);
-        }
+      for (HRegionInfo region : 
admin.getTableRegions(tableDesc.getTableName())) {
+        tasks.add(new RegionTask(admin.getConnection(), region, sink));
       }
     } finally {
       table.close();
     }
+    return executor.invokeAll(tasks);
   }
 
   /*
@@ -510,7 +720,7 @@ public final class Canary implements Tool {
       final HBaseAdmin admin,
       final Sink sink,
       HRegionInfo region,
-      HTable table) throws Exception {
+      HTableInterface table) throws Exception {
     HTableDescriptor tableDesc = table.getTableDescriptor();
     byte[] startKey = null;
     Get get = null;
@@ -560,12 +770,12 @@ public final class Canary implements Tool {
       }
     }
   }
-  //a monitor for regionserver mode
+  // a monitor for regionserver mode
   private static class RegionServerMonitor extends Monitor {
 
-    public RegionServerMonitor(Configuration config, String[] monitorTargets,
-        boolean useRegExp, ExtendedSink sink) {
-      super(config, monitorTargets, useRegExp, sink);
+    public RegionServerMonitor(HConnection connection, String[] 
monitorTargets, boolean useRegExp,
+        ExtendedSink sink, ExecutorService executor) {
+      super(connection, monitorTargets, useRegExp, sink, executor);
     }
 
     private ExtendedSink getSink() {
@@ -613,62 +823,27 @@ public final class Canary implements Tool {
     }
 
     private void monitorRegionServers(Map<String, List<HRegionInfo>> 
rsAndRMap) {
-      String serverName = null;
-      String tableName = null;
-      HRegionInfo region = null;
-      HTable table = null;
-      Get get = null;
-      byte[] startKey = null;
-      Scan scan = null;
-      StopWatch stopWatch = new StopWatch();
+      List<RegionServerTask> tasks = new ArrayList<RegionServerTask>();
+      Random rand =new Random();
       // monitor one region on every region server
       for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
-        stopWatch.reset();
-        serverName = entry.getKey();
-        // always get the first region
-        region = entry.getValue().get(0);
-        try {
-          tableName = region.getTable().getNameAsString();
-          table = new HTable(this.admin.getConfiguration(), tableName);
-          startKey = region.getStartKey();
-          // Can't do a get on empty start row so do a Scan of first element 
if any instead.
-          if(startKey.length > 0) {
-            get = new Get(startKey);
-            stopWatch.start();
-            table.get(get);
-            stopWatch.stop();
-          } else {
-            scan = new Scan();
-            scan.setCaching(1);
-            scan.setMaxResultSize(1L);
-            stopWatch.start();
-            table.getScanner(scan);
-            stopWatch.stop();
-          }
-          this.getSink().publishReadTiming(tableName, serverName, 
stopWatch.getTime());
-        } catch (TableNotFoundException tnfe) {
-          // This is ignored because it doesn't imply that the regionserver is 
dead
-        } catch (TableNotEnabledException tnee) {
-          // This is considered a success since we got a response.
-          LOG.debug("The targeted table was disabled.  Assuming success.");
-        } catch (DoNotRetryIOException dnrioe) {
-            this.getSink().publishReadFailure(tableName, serverName);
-            LOG.error(dnrioe);
-        } catch (IOException e) {
-          this.getSink().publishReadFailure(tableName, serverName);
-          LOG.error(e);
-          this.errorCode = ERROR_EXIT_CODE;
-        } finally {
-          if (table != null) {
-            try {
-              table.close();
-            } catch (IOException e) {/* DO NOTHING */
-            }
+        String serverName = entry.getKey();
+        // random select a region
+        HRegionInfo region = 
entry.getValue().get(rand.nextInt(entry.getValue().size()));
+        tasks.add(new RegionServerTask(this.connection, serverName, region, 
getSink()));
+      }
+      try {
+        for (Future<Void> future : this.executor.invokeAll(tasks)) {
+          try {
+            future.get();
+          } catch (ExecutionException e) {
+            LOG.error("Sniff regionserver failed!", e);
+            this.errorCode = ERROR_EXIT_CODE;
           }
-          scan = null;
-          get = null;
-          startKey = null;
         }
+      } catch (InterruptedException e) {
+        this.errorCode = ERROR_EXIT_CODE;
+        LOG.error("Sniff regionserver failed!", e);
       }
     }
 
@@ -680,18 +855,16 @@ public final class Canary implements Tool {
 
     private Map<String, List<HRegionInfo>> getAllRegionServerByName() {
       Map<String, List<HRegionInfo>> rsAndRMap = new HashMap<String, 
List<HRegionInfo>>();
-      HTable table = null;
+      HTableInterface table = null;
       try {
         HTableDescriptor[] tableDescs = this.admin.listTables();
         List<HRegionInfo> regions = null;
         for (HTableDescriptor tableDesc : tableDescs) {
-          table = new HTable(this.admin.getConfiguration(), 
tableDesc.getName());
-
-          for (Map.Entry<HRegionInfo, ServerName> entry : table
-              .getRegionLocations().entrySet()) {
-            ServerName rs = entry.getValue();
+          table = 
this.admin.getConnection().getTable(tableDesc.getTableName());
+          for (Entry<HRegionInfo, ServerName> e: 
((HTable)table).getRegionLocations().entrySet()) {
+            HRegionInfo r = e.getKey();
+            ServerName rs = e.getValue();
             String rsName = rs.getHostname();
-            HRegionInfo r = entry.getKey();
 
             if (rsAndRMap.containsKey(rsName)) {
               regions = rsAndRMap.get(rsName);
@@ -735,7 +908,7 @@ public final class Canary implements Tool {
           if (this.useRegExp) {
             regExpFound = false;
             pattern = Pattern.compile(rsName);
-            for (Map.Entry<String,List<HRegionInfo>> entry : 
fullRsAndRMap.entrySet()) {
+            for (Map.Entry<String, List<HRegionInfo>> entry : 
fullRsAndRMap.entrySet()) {
               matcher = pattern.matcher(entry.getKey());
               if (matcher.matches()) {
                 filteredRsAndRMap.put(entry.getKey(), entry.getValue());
@@ -762,7 +935,16 @@ public final class Canary implements Tool {
 
   public static void main(String[] args) throws Exception {
     final Configuration conf = HBaseConfiguration.create();
-    int exitCode = ToolRunner.run(conf, new Canary(), args);
+    AuthUtil.launchAuthChore(conf);
+    int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
+    ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
+
+    Class<? extends Sink> sinkClass =
+        conf.getClass("hbase.canary.sink.class", StdOutSink.class, Sink.class);
+    Sink sink = ReflectionUtils.newInstance(sinkClass);
+
+    int exitCode = ToolRunner.run(conf, new Canary(executor, sink), args);
+    executor.shutdown();
     System.exit(exitCode);
   }
 }

Reply via email to