Repository: hbase Updated Branches: refs/heads/0.98 036c684e1 -> e8d8ca74e
HBASE-13712 Backport HBASE-13199 to branch-1 0.98 backport. Includes: HBASE-13199 Some small improvements on canary tool (Shaohui Liu) HBASE-13199 ADDENDUM Some small improvements on canary tool (Shaohui Liu) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e8d8ca74 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e8d8ca74 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e8d8ca74 Branch: refs/heads/0.98 Commit: e8d8ca74e553479efdd75f81f705c88a7ec66947 Parents: 036c684 Author: Andrew Purtell <apurt...@apache.org> Authored: Fri May 22 12:13:27 2015 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Fri May 22 12:13:27 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/tool/Canary.java | 496 +++++++++++++------ 1 file changed, 339 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e8d8ca74/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java index 8107027..f55bc3c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java @@ -19,14 +19,23 @@ package org.apache.hadoop.hbase.tool; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -46,10 +55,14 @@ import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -113,6 +126,171 @@ public final class Canary implements Tool { } } + /** + * For each column family of the region tries to get one row and outputs the latency, or the + * failure. + */ + static class RegionTask implements Callable<Void> { + private HConnection connection; + private HRegionInfo region; + private Sink sink; + + RegionTask(HConnection connection, HRegionInfo region, Sink sink) { + this.connection = connection; + this.region = region; + this.sink = sink; + } + + @Override + public Void call() { + HTableInterface table = null; + HTableDescriptor tableDesc = null; + try { + table = connection.getTable(region.getTable()); + tableDesc = table.getTableDescriptor(); + } catch (IOException e) { + LOG.debug("sniffRegion failed", e); + sink.publishReadFailure(region, e); + if (table != null) { + try { + table.close(); + } catch (IOException ioe) { + } + } + return null; + } + + byte[] startKey = null; + Get get = null; + Scan scan = null; + ResultScanner rs = null; + StopWatch stopWatch = new StopWatch(); + for (HColumnDescriptor column : tableDesc.getColumnFamilies()) { + stopWatch.reset(); + startKey = region.getStartKey(); + // Can't do a get on empty start row so do a Scan of first element if any instead. + if (startKey.length > 0) { + get = new Get(startKey); + get.setCacheBlocks(false); + get.setFilter(new FirstKeyOnlyFilter()); + get.addFamily(column.getName()); + } else { + scan = new Scan(); + scan.setCaching(1); + scan.setCacheBlocks(false); + scan.setFilter(new FirstKeyOnlyFilter()); + scan.addFamily(column.getName()); + scan.setMaxResultSize(1L); + } + + try { + if (startKey.length > 0) { + stopWatch.start(); + table.get(get); + stopWatch.stop(); + sink.publishReadTiming(region, column, stopWatch.getTime()); + } else { + stopWatch.start(); + rs = table.getScanner(scan); + stopWatch.stop(); + sink.publishReadTiming(region, column, stopWatch.getTime()); + } + } catch (Exception e) { + sink.publishReadFailure(region, column, e); + } finally { + if (rs != null) { + rs.close(); + } + scan = null; + get = null; + startKey = null; + } + } + try { + table.close(); + } catch (IOException e) { + } + return null; + } + } + + /** + * Get one row from a region on the regionserver and outputs the latency, or the failure. + */ + static class RegionServerTask implements Callable<Void> { + private HConnection connection; + private String serverName; + private HRegionInfo region; + private ExtendedSink sink; + + RegionServerTask(HConnection connection, String serverName, HRegionInfo region, + ExtendedSink sink) { + this.connection = connection; + this.serverName = serverName; + this.region = region; + this.sink = sink; + } + + @Override + public Void call() { + TableName tableName = null; + HTableInterface table = null; + Get get = null; + byte[] startKey = null; + Scan scan = null; + StopWatch stopWatch = new StopWatch(); + // monitor one region on every region server + stopWatch.reset(); + try { + tableName = region.getTable(); + table = connection.getTable(tableName); + startKey = region.getStartKey(); + // Can't do a get on empty start row so do a Scan of first element if any instead. + if (startKey.length > 0) { + get = new Get(startKey); + get.setCacheBlocks(false); + get.setFilter(new FirstKeyOnlyFilter()); + stopWatch.start(); + table.get(get); + stopWatch.stop(); + } else { + scan = new Scan(); + scan.setCacheBlocks(false); + scan.setFilter(new FirstKeyOnlyFilter()); + scan.setCaching(1); + scan.setMaxResultSize(1L); + stopWatch.start(); + ResultScanner s = table.getScanner(scan); + s.close(); + stopWatch.stop(); + } + sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime()); + } catch (TableNotFoundException tnfe) { + // This is ignored because it doesn't imply that the regionserver is dead + } catch (TableNotEnabledException tnee) { + // This is considered a success since we got a response. + LOG.debug("The targeted table was disabled. Assuming success."); + } catch (DoNotRetryIOException dnrioe) { + sink.publishReadFailure(tableName.getNameAsString(), serverName); + LOG.error(dnrioe); + } catch (IOException e) { + sink.publishReadFailure(tableName.getNameAsString(), serverName); + LOG.error(e); + } finally { + if (table != null) { + try { + table.close(); + } catch (IOException e) {/* DO NOTHING */ + } + } + scan = null; + get = null; + startKey = null; + } + return null; + } + } + private static final int USAGE_EXIT_CODE = 1; private static final int INIT_ERROR_EXIT_CODE = 2; private static final int TIMEOUT_ERROR_EXIT_CODE = 3; @@ -122,6 +300,8 @@ public final class Canary implements Tool { private static final long DEFAULT_TIMEOUT = 600000; // 10 mins + private static final int MAX_THREADS_NUM = 16; // #threads to contact regions + private static final Log LOG = LogFactory.getLog(Canary.class); private Configuration conf = null; @@ -132,12 +312,14 @@ public final class Canary implements Tool { private long timeout = DEFAULT_TIMEOUT; private boolean failOnError = true; private boolean regionServerMode = false; + private ExecutorService executor; // threads to retrieve data from regionservers public Canary() { - this(new RegionServerStdOutSink()); + this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink()); } - public Canary(Sink sink) { + public Canary(ExecutorService executor, Sink sink) { + this.executor = executor; this.sink = sink; } @@ -227,54 +409,65 @@ public final class Canary implements Tool { } } - // launches chore for refreshing kerberos ticket if security is enabled. + // Launches chore for refreshing kerberos credentials if security is enabled. + // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster + // for more details. AuthUtil.launchAuthChore(conf); - // start to prepare the stuffs + // Start to prepare the stuffs Monitor monitor = null; Thread monitorThread = null; long startTime = 0; long currentTimeLength = 0; + // Get a connection to use in below. + HConnection connection = HConnectionManager.createConnection(this.conf); + try { + do { + // Do monitor !! + try { + monitor = this.newMonitor(connection, index, args); + monitorThread = new Thread(monitor); + startTime = System.currentTimeMillis(); + monitorThread.start(); + while (!monitor.isDone()) { + // wait for 1 sec + Thread.sleep(1000); + // exit if any error occurs + if (this.failOnError && monitor.hasError()) { + monitorThread.interrupt(); + if (monitor.initialized) { + System.exit(monitor.errorCode); + } else { + System.exit(INIT_ERROR_EXIT_CODE); + } + } + currentTimeLength = System.currentTimeMillis() - startTime; + if (currentTimeLength > this.timeout) { + LOG.error("The monitor is running too long (" + currentTimeLength + + ") after timeout limit:" + this.timeout + + " will be killed itself !!"); + if (monitor.initialized) { + System.exit(TIMEOUT_ERROR_EXIT_CODE); + } else { + System.exit(INIT_ERROR_EXIT_CODE); + } + break; + } + } - do { - // do monitor !! - monitor = this.newMonitor(index, args); - monitorThread = new Thread(monitor); - startTime = System.currentTimeMillis(); - monitorThread.start(); - while (!monitor.isDone()) { - // wait for 1 sec - Thread.sleep(1000); - // exit if any error occurs - if (this.failOnError && monitor.hasError()) { - monitorThread.interrupt(); - if (monitor.initialized) { + if (this.failOnError && monitor.hasError()) { + monitorThread.interrupt(); System.exit(monitor.errorCode); - } else { - System.exit(INIT_ERROR_EXIT_CODE); - } - } - currentTimeLength = System.currentTimeMillis() - startTime; - if (currentTimeLength > this.timeout) { - LOG.error("The monitor is running too long (" + currentTimeLength - + ") after timeout limit:" + this.timeout - + " will be killed itself !!"); - if (monitor.initialized) { - System.exit(TIMEOUT_ERROR_EXIT_CODE); - } else { - System.exit(INIT_ERROR_EXIT_CODE); } - break; + } finally { + if (monitor != null) monitor.close(); } - } - - if (this.failOnError && monitor.hasError()) { - monitorThread.interrupt(); - System.exit(monitor.errorCode); - } - Thread.sleep(interval); - } while (interval > 0); + Thread.sleep(interval); + } while (interval > 0); + } finally { + connection.close(); + } return(monitor.errorCode); } @@ -298,13 +491,13 @@ public final class Canary implements Tool { } /** - * a Factory method for {@link Monitor}. - * Can be overrided by user. + * A Factory method for {@link Monitor}. + * Can be overridden by user. * @param index a start index for monitor target * @param args args passed from user * @return a Monitor instance */ - public Monitor newMonitor(int index, String[] args) { + public Monitor newMonitor(final HConnection connection, int index, String[] args) { Monitor monitor = null; String[] monitorTargets = null; @@ -314,22 +507,21 @@ public final class Canary implements Tool { System.arraycopy(args, index, monitorTargets, 0, length); } - if(this.regionServerMode) { - monitor = new RegionServerMonitor( - this.conf, - monitorTargets, - this.useRegExp, - (ExtendedSink)this.sink); + if (this.regionServerMode) { + monitor = + new RegionServerMonitor(connection, monitorTargets, this.useRegExp, + (ExtendedSink) this.sink, this.executor); } else { - monitor = new RegionMonitor(this.conf, monitorTargets, this.useRegExp, this.sink); + monitor = + new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor); } return monitor; } // a Monitor super-class can be extended by users - public static abstract class Monitor implements Runnable { + public static abstract class Monitor implements Runnable, Closeable { - protected Configuration config; + protected HConnection connection; protected HBaseAdmin admin; protected String[] targets; protected boolean useRegExp; @@ -338,6 +530,7 @@ public final class Canary implements Tool { protected boolean done = false; protected int errorCode = 0; protected Sink sink; + protected ExecutorService executor; public boolean isDone() { return done; @@ -347,15 +540,20 @@ public final class Canary implements Tool { return errorCode != 0; } - protected Monitor(Configuration config, String[] monitorTargets, - boolean useRegExp, Sink sink) { - if (null == config) - throw new IllegalArgumentException("config shall not be null"); + @Override + public void close() throws IOException { + if (this.admin != null) this.admin.close(); + } + + protected Monitor(HConnection connection, String[] monitorTargets, boolean useRegExp, Sink sink, + ExecutorService executor) { + if (null == connection) throw new IllegalArgumentException("connection shall not be null"); - this.config = config; + this.connection = connection; this.targets = monitorTargets; this.useRegExp = useRegExp; this.sink = sink; + this.executor = executor; } public abstract void run(); @@ -363,7 +561,7 @@ public final class Canary implements Tool { protected boolean initAdmin() { if (null == this.admin) { try { - this.admin = new HBaseAdmin(config); + this.admin = new HBaseAdmin(connection); } catch (Exception e) { LOG.error("Initial HBaseAdmin failed...", e); this.errorCode = INIT_ERROR_EXIT_CODE; @@ -379,23 +577,31 @@ public final class Canary implements Tool { // a monitor for region mode private static class RegionMonitor extends Monitor { - public RegionMonitor(Configuration config, String[] monitorTargets, - boolean useRegExp, Sink sink) { - super(config, monitorTargets, useRegExp, sink); + public RegionMonitor(HConnection connection, String[] monitorTargets, boolean useRegExp, + Sink sink, ExecutorService executor) { + super(connection, monitorTargets, useRegExp, sink, executor); } @Override public void run() { - if(this.initAdmin()) { + if (this.initAdmin()) { try { + List<Future<Void>> taskFutures = new LinkedList<Future<Void>>(); if (this.targets != null && this.targets.length > 0) { String[] tables = generateMonitorTables(this.targets); this.initialized = true; for (String table : tables) { - Canary.sniff(admin, sink, table); + taskFutures.addAll(Canary.sniff(admin, sink, table, executor)); } } else { - sniff(); + taskFutures.addAll(sniff()); + } + for (Future<Void> future : taskFutures) { + try { + future.get(); + } catch (ExecutionException e) { + LOG.error("Sniff region failed!", e); + } } } catch (Exception e) { LOG.error("Run regionMonitor failed", e); @@ -408,7 +614,7 @@ public final class Canary implements Tool { private String[] generateMonitorTables(String[] monitorTargets) throws IOException { String[] returnTables = null; - if(this.useRegExp) { + if (this.useRegExp) { Pattern pattern = null; HTableDescriptor[] tds = null; Set<String> tmpTables = new TreeSet<String>(); @@ -422,16 +628,15 @@ public final class Canary implements Tool { } } } - } catch(IOException e) { + } catch (IOException e) { LOG.error("Communicate with admin failed", e); throw e; } - if(tmpTables.size() > 0) { + if (tmpTables.size() > 0) { returnTables = tmpTables.toArray(new String[tmpTables.size()]); } else { - String msg = "No HTable found, tablePattern:" - + Arrays.toString(monitorTargets); + String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets); LOG.error(msg); this.errorCode = INIT_ERROR_EXIT_CODE; throw new TableNotFoundException(msg); @@ -446,12 +651,15 @@ public final class Canary implements Tool { /* * canary entry point to monitor all the tables. */ - private void sniff() throws Exception { + private List<Future<Void>> sniff() throws Exception { + List<Future<Void>> taskFutures = new LinkedList<Future<Void>>(); for (HTableDescriptor table : admin.listTables()) { - Canary.sniff(admin, sink, table); + if (admin.isTableEnabled(table.getTableName())) { + taskFutures.addAll(Canary.sniff(admin, sink, table, executor)); + } } + return taskFutures; } - } /** @@ -459,47 +667,49 @@ public final class Canary implements Tool { * @throws Exception */ public static void sniff(final HBaseAdmin admin, TableName tableName) throws Exception { - sniff(admin, new StdOutSink(), tableName.getNameAsString()); + List<Future<Void>> taskFutures = + Canary.sniff(admin, new StdOutSink(), tableName.getNameAsString(), + new ScheduledThreadPoolExecutor(1)); + for (Future<Void> future : taskFutures) { + future.get(); + } } /** * Canary entry point for specified table. * @throws Exception */ - private static void sniff(final HBaseAdmin admin, final Sink sink, String tableName) - throws Exception { - if (admin.isTableAvailable(tableName)) { - sniff(admin, sink, admin.getTableDescriptor(tableName.getBytes())); + private static List<Future<Void>> sniff(final HBaseAdmin admin, final Sink sink, String tableName, + ExecutorService executor) throws Exception { + if (admin.isTableEnabled(TableName.valueOf(tableName))) { + return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)), + executor); } else { - LOG.warn(String.format("Table %s is not available", tableName)); + LOG.warn(String.format("Table %s is not enabled", tableName)); } + return new LinkedList<Future<Void>>(); } /* * Loops over regions that owns this table, and output some information abouts the state. */ - private static void sniff(final HBaseAdmin admin, final Sink sink, HTableDescriptor tableDesc) - throws Exception { - HTable table = null; - + private static List<Future<Void>> sniff(final HBaseAdmin admin, final Sink sink, + HTableDescriptor tableDesc, ExecutorService executor) throws Exception { + HTableInterface table = null; try { - table = new HTable(admin.getConfiguration(), tableDesc.getName()); + table = admin.getConnection().getTable(tableDesc.getTableName()); } catch (TableNotFoundException e) { - return; + return new ArrayList<Future<Void>>(); } - + List<RegionTask> tasks = new ArrayList<RegionTask>(); try { - for (HRegionInfo region : admin.getTableRegions(tableDesc.getName())) { - try { - sniffRegion(admin, sink, region, table); - } catch (Exception e) { - sink.publishReadFailure(region, e); - LOG.debug("sniffRegion failed", e); - } + for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) { + tasks.add(new RegionTask(admin.getConnection(), region, sink)); } } finally { table.close(); } + return executor.invokeAll(tasks); } /* @@ -510,7 +720,7 @@ public final class Canary implements Tool { final HBaseAdmin admin, final Sink sink, HRegionInfo region, - HTable table) throws Exception { + HTableInterface table) throws Exception { HTableDescriptor tableDesc = table.getTableDescriptor(); byte[] startKey = null; Get get = null; @@ -560,12 +770,12 @@ public final class Canary implements Tool { } } } - //a monitor for regionserver mode + // a monitor for regionserver mode private static class RegionServerMonitor extends Monitor { - public RegionServerMonitor(Configuration config, String[] monitorTargets, - boolean useRegExp, ExtendedSink sink) { - super(config, monitorTargets, useRegExp, sink); + public RegionServerMonitor(HConnection connection, String[] monitorTargets, boolean useRegExp, + ExtendedSink sink, ExecutorService executor) { + super(connection, monitorTargets, useRegExp, sink, executor); } private ExtendedSink getSink() { @@ -613,62 +823,27 @@ public final class Canary implements Tool { } private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap) { - String serverName = null; - String tableName = null; - HRegionInfo region = null; - HTable table = null; - Get get = null; - byte[] startKey = null; - Scan scan = null; - StopWatch stopWatch = new StopWatch(); + List<RegionServerTask> tasks = new ArrayList<RegionServerTask>(); + Random rand =new Random(); // monitor one region on every region server for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) { - stopWatch.reset(); - serverName = entry.getKey(); - // always get the first region - region = entry.getValue().get(0); - try { - tableName = region.getTable().getNameAsString(); - table = new HTable(this.admin.getConfiguration(), tableName); - startKey = region.getStartKey(); - // Can't do a get on empty start row so do a Scan of first element if any instead. - if(startKey.length > 0) { - get = new Get(startKey); - stopWatch.start(); - table.get(get); - stopWatch.stop(); - } else { - scan = new Scan(); - scan.setCaching(1); - scan.setMaxResultSize(1L); - stopWatch.start(); - table.getScanner(scan); - stopWatch.stop(); - } - this.getSink().publishReadTiming(tableName, serverName, stopWatch.getTime()); - } catch (TableNotFoundException tnfe) { - // This is ignored because it doesn't imply that the regionserver is dead - } catch (TableNotEnabledException tnee) { - // This is considered a success since we got a response. - LOG.debug("The targeted table was disabled. Assuming success."); - } catch (DoNotRetryIOException dnrioe) { - this.getSink().publishReadFailure(tableName, serverName); - LOG.error(dnrioe); - } catch (IOException e) { - this.getSink().publishReadFailure(tableName, serverName); - LOG.error(e); - this.errorCode = ERROR_EXIT_CODE; - } finally { - if (table != null) { - try { - table.close(); - } catch (IOException e) {/* DO NOTHING */ - } + String serverName = entry.getKey(); + // random select a region + HRegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size())); + tasks.add(new RegionServerTask(this.connection, serverName, region, getSink())); + } + try { + for (Future<Void> future : this.executor.invokeAll(tasks)) { + try { + future.get(); + } catch (ExecutionException e) { + LOG.error("Sniff regionserver failed!", e); + this.errorCode = ERROR_EXIT_CODE; } - scan = null; - get = null; - startKey = null; } + } catch (InterruptedException e) { + this.errorCode = ERROR_EXIT_CODE; + LOG.error("Sniff regionserver failed!", e); } } @@ -680,18 +855,16 @@ public final class Canary implements Tool { private Map<String, List<HRegionInfo>> getAllRegionServerByName() { Map<String, List<HRegionInfo>> rsAndRMap = new HashMap<String, List<HRegionInfo>>(); - HTable table = null; + HTableInterface table = null; try { HTableDescriptor[] tableDescs = this.admin.listTables(); List<HRegionInfo> regions = null; for (HTableDescriptor tableDesc : tableDescs) { - table = new HTable(this.admin.getConfiguration(), tableDesc.getName()); - - for (Map.Entry<HRegionInfo, ServerName> entry : table - .getRegionLocations().entrySet()) { - ServerName rs = entry.getValue(); + table = this.admin.getConnection().getTable(tableDesc.getTableName()); + for (Entry<HRegionInfo, ServerName> e: ((HTable)table).getRegionLocations().entrySet()) { + HRegionInfo r = e.getKey(); + ServerName rs = e.getValue(); String rsName = rs.getHostname(); - HRegionInfo r = entry.getKey(); if (rsAndRMap.containsKey(rsName)) { regions = rsAndRMap.get(rsName); @@ -735,7 +908,7 @@ public final class Canary implements Tool { if (this.useRegExp) { regExpFound = false; pattern = Pattern.compile(rsName); - for (Map.Entry<String,List<HRegionInfo>> entry : fullRsAndRMap.entrySet()) { + for (Map.Entry<String, List<HRegionInfo>> entry : fullRsAndRMap.entrySet()) { matcher = pattern.matcher(entry.getKey()); if (matcher.matches()) { filteredRsAndRMap.put(entry.getKey(), entry.getValue()); @@ -762,7 +935,16 @@ public final class Canary implements Tool { public static void main(String[] args) throws Exception { final Configuration conf = HBaseConfiguration.create(); - int exitCode = ToolRunner.run(conf, new Canary(), args); + AuthUtil.launchAuthChore(conf); + int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM); + ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads); + + Class<? extends Sink> sinkClass = + conf.getClass("hbase.canary.sink.class", StdOutSink.class, Sink.class); + Sink sink = ReflectionUtils.newInstance(sinkClass); + + int exitCode = ToolRunner.run(conf, new Canary(executor, sink), args); + executor.shutdown(); System.exit(exitCode); } }