Repository: hbase Updated Branches: refs/heads/branch-1 4ec59d018 -> 15d16281a
HBASE-19290 Reduce zk request when doing split log Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/15d16281 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/15d16281 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/15d16281 Branch: refs/heads/branch-1 Commit: 15d16281a1c3d246dc991064c7828b77baaaaf6f Parents: 4ec59d0 Author: binlijin <binli...@gmail.com> Authored: Mon Dec 4 18:36:11 2017 +0800 Committer: binlijin <binli...@gmail.com> Committed: Mon Dec 4 18:36:11 2017 +0800 ---------------------------------------------------------------------- .../ZkSplitLogWorkerCoordination.java | 79 +++++++++++++------- 1 file changed, 52 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/15d16281/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java index 7e6708e..f22a62a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.coordination; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -206,27 +207,28 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements * try to grab a 'lock' on the task zk node to own and execute the task. * <p> * @param path zk node for the task + * @return boolean value when grab a task success return true otherwise false */ - private void grabTask(String path) { + private boolean grabTask(String path) { Stat stat = new Stat(); byte[] data; synchronized (grabTaskLock) { currentTask = path; workerInGrabTask = true; if (Thread.interrupted()) { - return; + return false; } } try { try { if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) { SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet(); - return; + return false; } } catch (KeeperException e) { LOG.warn("Failed to get data for znode " + path, e); SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); - return; + return false; } SplitLogTask slt; try { @@ -234,11 +236,11 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements } catch (DeserializationException e) { LOG.warn("Failed parse data for znode " + path, e); SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); - return; + return false; } if (!slt.isUnassigned()) { SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet(); - return; + return false; } currentVersion = @@ -246,7 +248,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements slt.getMode(), stat.getVersion()); if (currentVersion < 0) { SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet(); - return; + return false; } if (ZKSplitLog.isRescanNode(watcher, currentTask)) { @@ -257,7 +259,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements endTask(new SplitLogTask.Done(server.getServerName(), slt.getMode()), SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails); - return; + return false; } LOG.info("worker " + server.getServerName() + " acquired task " + path); @@ -274,6 +276,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements LOG.warn("Interrupted while yielding for other region servers", e); Thread.currentThread().interrupt(); } + return true; } finally { synchronized (grabTaskLock) { workerInGrabTask = false; @@ -325,12 +328,13 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements } /** - * This function calculates how many splitters it could create based on expected average tasks per - * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br> + * This function calculates how many splitters this RS should create based on expected average + * tasks per RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br> * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound) - * @param numTasks current total number of available tasks + * @param numTasks total number of split tasks available + * @return number of tasks this RS can grab */ - private int calculateAvailableSplitters(int numTasks) { + private int getNumExpectedTasksPerRS(int numTasks) { // at lease one RS(itself) available int availableRSs = 1; try { @@ -341,12 +345,17 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements // do nothing LOG.debug("getAvailableRegionServers got ZooKeeper exception", e); } - int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1); - expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one - // calculate how many more splitters we could spawn - return Math.min(expectedTasksPerRS, maxConcurrentTasks) - - this.tasksInProgress.get(); + return Math.max(1, expectedTasksPerRS); // at least be one + } + + /** + * @param expectedTasksPerRS Average number of tasks to be handled by each RS + * @return true if more splitters are available, otherwise false. + */ + private boolean areSplittersAvailable(int expectedTasksPerRS) { + return (Math.min(expectedTasksPerRS, maxConcurrentTasks) + - this.tasksInProgress.get()) > 0; } /** @@ -415,8 +424,11 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements + " ... worker thread exiting."); return; } + // shuffle the paths to prevent different split log worker start from the same log file after + // meta log (if any) + Collections.shuffle(paths); // pick meta wal firstly - int offset = (int) (Math.random() * paths.size()); + int offset = 0; for (int i = 0; i < paths.size(); i++) { if (DefaultWALProvider.isMetaFile(paths.get(i))) { offset = i; @@ -424,21 +436,34 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements } } int numTasks = paths.size(); + int expectedTasksPerRS = getNumExpectedTasksPerRS(numTasks); + boolean taskGrabbed = false; for (int i = 0; i < numTasks; i++) { - int idx = (i + offset) % paths.size(); - // don't call ZKSplitLog.getNodeName() because that will lead to - // double encoding of the path name - if (this.calculateAvailableSplitters(numTasks) > 0) { - grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx))); - } else { - LOG.debug("Current region server " + server.getServerName() + " has " - + this.tasksInProgress.get() + " tasks in progress and can't take more."); - break; + while (!shouldStop) { + if (this.areSplittersAvailable(expectedTasksPerRS)) { + LOG.debug("Current region server " + server.getServerName() + + " is ready to take more tasks, will get task list and try grab tasks again."); + int idx = (i + offset) % paths.size(); + // don't call ZKSplitLog.getNodeName() because that will lead to + // double encoding of the path name + taskGrabbed |= grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, + paths.get(idx))); + break; + } else { + LOG.debug("Current region server " + server.getServerName() + + " has " + this.tasksInProgress.get() + + " tasks in progress and can't take more."); + Thread.sleep(100); + } } if (shouldStop) { return; } } + if (!taskGrabbed && !shouldStop) { + // do not grab any tasks, sleep a little bit to reduce zk request. + Thread.sleep(1000); + } SplitLogCounters.tot_wkr_task_grabing.incrementAndGet(); synchronized (taskReadyLock) { while (seq_start == taskReadySeq.get()) {