HBASE-18772 [JDK8] Replace AtomicLong with LongAdder Signed-off-by: Chia-Ping Tsai <chia7...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/eb5e4367 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/eb5e4367 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/eb5e4367 Branch: refs/heads/HBASE-18467 Commit: eb5e43673c3aa93a0eb7af82b79c764f351bfcf7 Parents: a66bd04 Author: Yechao Chen <chenyec...@gmail.com> Authored: Wed Sep 13 04:46:14 2017 +0800 Committer: Chia-Ping Tsai <chia7...@gmail.com> Committed: Wed Sep 13 06:09:51 2017 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/SplitLogCounters.java | 114 +++++++++---------- .../SplitLogWorkerCoordination.java | 6 +- .../ZKSplitLogManagerCoordination.java | 56 ++++----- .../ZkSplitLogWorkerCoordination.java | 34 +++--- .../hadoop/hbase/io/hfile/LruBlockCache.java | 20 ++-- .../hbase/io/hfile/bucket/BucketAllocator.java | 6 +- .../hbase/io/hfile/bucket/BucketCache.java | 37 +++--- .../hbase/ipc/AdaptiveLifoCoDelCallQueue.java | 16 +-- .../hadoop/hbase/master/SplitLogManager.java | 12 +- .../apache/hadoop/hbase/mob/MobFileCache.java | 21 ++-- .../hadoop/hbase/regionserver/ChunkCreator.java | 8 +- .../hadoop/hbase/regionserver/HRegion.java | 28 ++--- .../regionserver/MetricsRegionWrapperImpl.java | 12 +- .../regionserver/RegionServerAccounting.java | 18 +-- .../handler/WALSplitterHandler.java | 2 +- .../org/apache/hadoop/hbase/tool/Canary.java | 35 +++--- .../io/hfile/bucket/TestBucketWriterThread.java | 5 +- .../master/TestDistributedLogSplitting.java | 19 ++-- .../hbase/master/TestSplitLogManager.java | 43 +++---- .../hbase/regionserver/TestSplitLogWorker.java | 16 +-- .../hadoop/hbase/thrift/IncrementCoalescer.java | 22 ++-- 21 files changed, 268 insertions(+), 262 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java index bde1b88..8913514 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hbase; * limitations under the License. */ import java.lang.reflect.Field; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -28,69 +28,69 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; */ @InterfaceAudience.Private public class SplitLogCounters { - //SplitLogManager counters - public final static AtomicLong tot_mgr_log_split_batch_start = new AtomicLong(0); - public final static AtomicLong tot_mgr_log_split_batch_success = new AtomicLong(0); - public final static AtomicLong tot_mgr_log_split_batch_err = new AtomicLong(0); - public final static AtomicLong tot_mgr_new_unexpected_wals = new AtomicLong(0); - public final static AtomicLong tot_mgr_log_split_start = new AtomicLong(0); - public final static AtomicLong tot_mgr_log_split_success = new AtomicLong(0); - public final static AtomicLong tot_mgr_log_split_err = new AtomicLong(0); - public final static AtomicLong tot_mgr_node_create_queued = new AtomicLong(0); - public final static AtomicLong tot_mgr_node_create_result = new AtomicLong(0); - public final static AtomicLong tot_mgr_node_already_exists = new AtomicLong(0); - public final static AtomicLong tot_mgr_node_create_err = new AtomicLong(0); - public final static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0); - public final static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0); - public final static AtomicLong tot_mgr_get_data_result = new AtomicLong(0); - public final static AtomicLong tot_mgr_get_data_nonode = new AtomicLong(0); - public final static AtomicLong tot_mgr_get_data_err = new AtomicLong(0); - public final static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0); - public final static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0); - public final static AtomicLong tot_mgr_node_delete_result = new AtomicLong(0); - public final static AtomicLong tot_mgr_node_delete_err = new AtomicLong(0); - public final static AtomicLong tot_mgr_resubmit = new AtomicLong(0); - public final static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0); - public final static AtomicLong tot_mgr_null_data = new AtomicLong(0); - public final static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0); - public final static AtomicLong tot_mgr_wait_for_zk_delete = new AtomicLong(0); - public final static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0); - public final static AtomicLong tot_mgr_resubmit_threshold_reached = new AtomicLong(0); - public final static AtomicLong tot_mgr_missing_state_in_delete = new AtomicLong(0); - public final static AtomicLong tot_mgr_heartbeat = new AtomicLong(0); - public final static AtomicLong tot_mgr_rescan = new AtomicLong(0); - public final static AtomicLong tot_mgr_rescan_deleted = new AtomicLong(0); - public final static AtomicLong tot_mgr_task_deleted = new AtomicLong(0); - public final static AtomicLong tot_mgr_resubmit_unassigned = new AtomicLong(0); - public final static AtomicLong tot_mgr_relist_logdir = new AtomicLong(0); - public final static AtomicLong tot_mgr_resubmit_dead_server_task = new AtomicLong(0); - public final static AtomicLong tot_mgr_resubmit_force = new AtomicLong(0); + //Spnager counters + public final static LongAdder tot_mgr_log_split_batch_start = new LongAdder(); + public final static LongAdder tot_mgr_log_split_batch_success = new LongAdder(); + public final static LongAdder tot_mgr_log_split_batch_err = new LongAdder(); + public final static LongAdder tot_mgr_new_unexpected_wals = new LongAdder(); + public final static LongAdder tot_mgr_log_split_start = new LongAdder(); + public final static LongAdder tot_mgr_log_split_success = new LongAdder(); + public final static LongAdder tot_mgr_log_split_err = new LongAdder(); + public final static LongAdder tot_mgr_node_create_queued = new LongAdder(); + public final static LongAdder tot_mgr_node_create_result = new LongAdder(); + public final static LongAdder tot_mgr_node_already_exists = new LongAdder(); + public final static LongAdder tot_mgr_node_create_err = new LongAdder(); + public final static LongAdder tot_mgr_node_create_retry = new LongAdder(); + public final static LongAdder tot_mgr_get_data_queued = new LongAdder(); + public final static LongAdder tot_mgr_get_data_result = new LongAdder(); + public final static LongAdder tot_mgr_get_data_nonode = new LongAdder(); + public final static LongAdder tot_mgr_get_data_err = new LongAdder(); + public final static LongAdder tot_mgr_get_data_retry = new LongAdder(); + public final static LongAdder tot_mgr_node_delete_queued = new LongAdder(); + public final static LongAdder tot_mgr_node_delete_result = new LongAdder(); + public final static LongAdder tot_mgr_node_delete_err = new LongAdder(); + public final static LongAdder tot_mgr_resubmit = new LongAdder(); + public final static LongAdder tot_mgr_resubmit_failed = new LongAdder(); + public final static LongAdder tot_mgr_null_data = new LongAdder(); + public final static LongAdder tot_mgr_orphan_task_acquired = new LongAdder(); + public final static LongAdder tot_mgr_wait_for_zk_delete = new LongAdder(); + public final static LongAdder tot_mgr_unacquired_orphan_done = new LongAdder(); + public final static LongAdder tot_mgr_resubmit_threshold_reached = new LongAdder(); + public final static LongAdder tot_mgr_missing_state_in_delete = new LongAdder(); + public final static LongAdder tot_mgr_heartbeat = new LongAdder(); + public final static LongAdder tot_mgr_rescan = new LongAdder(); + public final static LongAdder tot_mgr_rescan_deleted = new LongAdder(); + public final static LongAdder tot_mgr_task_deleted = new LongAdder(); + public final static LongAdder tot_mgr_resubmit_unassigned = new LongAdder(); + public final static LongAdder tot_mgr_relist_logdir = new LongAdder(); + public final static LongAdder tot_mgr_resubmit_dead_server_task = new LongAdder(); + public final static LongAdder tot_mgr_resubmit_force = new LongAdder(); // SplitLogWorker counters - public final static AtomicLong tot_wkr_failed_to_grab_task_no_data = new AtomicLong(0); - public final static AtomicLong tot_wkr_failed_to_grab_task_exception = new AtomicLong(0); - public final static AtomicLong tot_wkr_failed_to_grab_task_owned = new AtomicLong(0); - public final static AtomicLong tot_wkr_failed_to_grab_task_lost_race = new AtomicLong(0); - public final static AtomicLong tot_wkr_task_acquired = new AtomicLong(0); - public final static AtomicLong tot_wkr_task_resigned = new AtomicLong(0); - public final static AtomicLong tot_wkr_task_done = new AtomicLong(0); - public final static AtomicLong tot_wkr_task_err = new AtomicLong(0); - public final static AtomicLong tot_wkr_task_heartbeat = new AtomicLong(0); - public final static AtomicLong tot_wkr_task_acquired_rescan = new AtomicLong(0); - public final static AtomicLong tot_wkr_get_data_queued = new AtomicLong(0); - public final static AtomicLong tot_wkr_get_data_result = new AtomicLong(0); - public final static AtomicLong tot_wkr_get_data_retry = new AtomicLong(0); - public final static AtomicLong tot_wkr_preempt_task = new AtomicLong(0); - public final static AtomicLong tot_wkr_task_heartbeat_failed = new AtomicLong(0); - public final static AtomicLong tot_wkr_final_transition_failed = new AtomicLong(0); - public final static AtomicLong tot_wkr_task_grabing = new AtomicLong(0); + public final static LongAdder tot_wkr_failed_to_grab_task_no_data = new LongAdder(); + public final static LongAdder tot_wkr_failed_to_grab_task_exception = new LongAdder(); + public final static LongAdder tot_wkr_failed_to_grab_task_owned = new LongAdder(); + public final static LongAdder tot_wkr_failed_to_grab_task_lost_race = new LongAdder(); + public final static LongAdder tot_wkr_task_acquired = new LongAdder(); + public final static LongAdder tot_wkr_task_resigned = new LongAdder(); + public final static LongAdder tot_wkr_task_done = new LongAdder(); + public final static LongAdder tot_wkr_task_err = new LongAdder(); + public final static LongAdder tot_wkr_task_heartbeat = new LongAdder(); + public final static LongAdder tot_wkr_task_acquired_rescan = new LongAdder(); + public final static LongAdder tot_wkr_get_data_queued = new LongAdder(); + public final static LongAdder tot_wkr_get_data_result = new LongAdder(); + public final static LongAdder tot_wkr_get_data_retry = new LongAdder(); + public final static LongAdder tot_wkr_preempt_task = new LongAdder(); + public final static LongAdder tot_wkr_task_heartbeat_failed = new LongAdder(); + public final static LongAdder tot_wkr_final_transition_failed = new LongAdder(); + public final static LongAdder tot_wkr_task_grabing = new LongAdder(); public static void resetCounters() throws Exception { Class<?> cl = SplitLogCounters.class; for (Field fld : cl.getDeclaredFields()) { /* Guard against source instrumentation. */ - if ((!fld.isSynthetic()) && (AtomicLong.class.isAssignableFrom(fld.getType()))) { - ((AtomicLong)fld.get(null)).set(0); + if ((!fld.isSynthetic()) && (LongAdder.class.isAssignableFrom(fld.getType()))) { + ((LongAdder)fld.get(null)).reset(); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java index 5b26c49..ee83657 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java @@ -18,7 +18,7 @@ */ package org.apache.hadoop.hbase.coordination; import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe * {@link #isStop()} a flag indicates whether worker should finish <BR> * {@link #registerListener()} called from {@link SplitLogWorker#run()} and could register listener * for external changes in coordination (if required) <BR> - * {@link #endTask(SplitLogTask, AtomicLong, SplitTaskDetails)} notify coordination engine that + * {@link #endTask(SplitLogTask, LongAdder, SplitTaskDetails)} notify coordination engine that * <p> * Important methods for WALSplitterHandler: <BR> * splitting task has completed. @@ -121,7 +121,7 @@ public interface SplitLogWorkerCoordination { * @param splitTaskDetails details about log split task (specific to coordination engine being * used). */ - void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails splitTaskDetails); + void endTask(SplitLogTask slt, LongAdder ctr, SplitTaskDetails splitTaskDetails); /** * Interface for log-split tasks Used to carry implementation details in encapsulated way through http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java index 1654c67..6017317 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java @@ -206,7 +206,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements if (task.unforcedResubmits.get() >= resubmitThreshold) { if (!task.resubmitThresholdReached) { task.resubmitThresholdReached = true; - SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet(); + SplitLogCounters.tot_mgr_resubmit_threshold_reached.increment(); LOG.info("Skipping resubmissions of task " + path + " because threshold " + resubmitThreshold + " reached"); } @@ -215,7 +215,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements // race with heartbeat() that might be changing last_version version = task.last_version; } else { - SplitLogCounters.tot_mgr_resubmit_force.incrementAndGet(); + SplitLogCounters.tot_mgr_resubmit_force.increment(); version = -1; } LOG.info("resubmitting task " + path); @@ -231,7 +231,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements } task.setUnassigned(); rescan(Long.MAX_VALUE); - SplitLogCounters.tot_mgr_resubmit.incrementAndGet(); + SplitLogCounters.tot_mgr_resubmit.increment(); return true; } @@ -273,7 +273,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements .getZooKeeper() .getData(path, this.watcher, new GetDataAsyncCallback(), Long.valueOf(-1) /* retry count */); - SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet(); + SplitLogCounters.tot_mgr_get_data_queued.increment(); } /** @@ -354,7 +354,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements } private void deleteNode(String path, Long retries) { - SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet(); + SplitLogCounters.tot_mgr_node_delete_queued.increment(); // Once a task znode is ready for delete, that is it is in the TASK_DONE // state, then no one should be writing to it anymore. That is no one // will be updating the znode version any more. @@ -370,9 +370,9 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements task = details.getTasks().remove(path); if (task == null) { if (ZKSplitLog.isRescanNode(watcher, path)) { - SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet(); + SplitLogCounters.tot_mgr_rescan_deleted.increment(); } - SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet(); + SplitLogCounters.tot_mgr_missing_state_in_delete.increment(); LOG.debug("deleted task without in memory state " + path); return; } @@ -380,7 +380,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements task.status = DELETED; task.notify(); } - SplitLogCounters.tot_mgr_task_deleted.incrementAndGet(); + SplitLogCounters.tot_mgr_task_deleted.increment(); } private void deleteNodeFailure(String path) { @@ -389,7 +389,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements } private void createRescanSuccess(String path) { - SplitLogCounters.tot_mgr_rescan.incrementAndGet(); + SplitLogCounters.tot_mgr_rescan.increment(); getDataSetWatch(path, zkretries); } @@ -416,7 +416,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName(), getRecoveryMode()); ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count); - SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet(); + SplitLogCounters.tot_mgr_node_create_queued.increment(); return; } @@ -434,7 +434,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements private void getDataSetWatch(String path, Long retry_count) { this.watcher.getRecoverableZooKeeper().getZooKeeper() .getData(path, this.watcher, new GetDataAsyncCallback(), retry_count); - SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet(); + SplitLogCounters.tot_mgr_get_data_queued.increment(); } @@ -446,7 +446,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements setDone(path, SUCCESS); return; } - SplitLogCounters.tot_mgr_null_data.incrementAndGet(); + SplitLogCounters.tot_mgr_null_data.increment(); LOG.fatal("logic error - got null data " + path); setDone(path, FAILURE); return; @@ -497,17 +497,17 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements Task task = details.getTasks().get(path); if (task == null) { if (!ZKSplitLog.isRescanNode(watcher, path)) { - SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet(); + SplitLogCounters.tot_mgr_unacquired_orphan_done.increment(); LOG.debug("unacquired orphan task is done " + path); } } else { synchronized (task) { if (task.status == IN_PROGRESS) { if (status == SUCCESS) { - SplitLogCounters.tot_mgr_log_split_success.incrementAndGet(); + SplitLogCounters.tot_mgr_log_split_success.increment(); LOG.info("Done splitting " + path); } else { - SplitLogCounters.tot_mgr_log_split_err.incrementAndGet(); + SplitLogCounters.tot_mgr_log_split_err.increment(); LOG.warn("Error splitting " + path); } task.status = status; @@ -536,7 +536,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements private Task findOrCreateOrphanTask(String path) { return computeIfAbsent(details.getTasks(), path, Task::new, () -> { LOG.info("creating orphan task " + path); - SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet(); + SplitLogCounters.tot_mgr_orphan_task_acquired.increment(); }); } @@ -547,7 +547,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements LOG.info("task " + path + " acquired by " + workerName); } task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName); - SplitLogCounters.tot_mgr_heartbeat.incrementAndGet(); + SplitLogCounters.tot_mgr_heartbeat.increment(); } else { // duplicate heartbeats - heartbeats w/o zk node version // changing - are possible. The timeout thread does @@ -898,7 +898,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements LOG.debug("failed to resubmit task " + path + " version changed"); return false; } catch (KeeperException e) { - SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet(); + SplitLogCounters.tot_mgr_resubmit_failed.increment(); LOG.warn("failed to resubmit " + path, e); return false; } @@ -947,7 +947,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements @Override public void processResult(int rc, String path, Object ctx, String name) { - SplitLogCounters.tot_mgr_node_create_result.incrementAndGet(); + SplitLogCounters.tot_mgr_node_create_result.increment(); if (rc != 0) { if (needAbandonRetries(rc, "Create znode " + path)) { createNodeFailure(path); @@ -961,16 +961,16 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements // And all code pieces correctly handle the case of suddenly // disappearing task-znode. LOG.debug("found pre-existing znode " + path); - SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet(); + SplitLogCounters.tot_mgr_node_already_exists.increment(); } else { Long retry_count = (Long) ctx; LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " + path + " remaining retries=" + retry_count); if (retry_count == 0) { - SplitLogCounters.tot_mgr_node_create_err.incrementAndGet(); + SplitLogCounters.tot_mgr_node_create_err.increment(); createNodeFailure(path); } else { - SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet(); + SplitLogCounters.tot_mgr_node_create_retry.increment(); createNode(path, retry_count - 1); } return; @@ -988,13 +988,13 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - SplitLogCounters.tot_mgr_get_data_result.incrementAndGet(); + SplitLogCounters.tot_mgr_get_data_result.increment(); if (rc != 0) { if (needAbandonRetries(rc, "GetData from znode " + path)) { return; } if (rc == KeeperException.Code.NONODE.intValue()) { - SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet(); + SplitLogCounters.tot_mgr_get_data_nonode.increment(); LOG.warn("task znode " + path + " vanished or not created yet."); // ignore since we should not end up in a case where there is in-memory task, // but no znode. The only case is between the time task is created in-memory @@ -1011,10 +1011,10 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path + " remaining retries=" + retry_count); if (retry_count == 0) { - SplitLogCounters.tot_mgr_get_data_err.incrementAndGet(); + SplitLogCounters.tot_mgr_get_data_err.increment(); getDataSetWatchFailure(path); } else { - SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet(); + SplitLogCounters.tot_mgr_get_data_retry.increment(); getDataSetWatch(path, retry_count - 1); } return; @@ -1036,14 +1036,14 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements @Override public void processResult(int rc, String path, Object ctx) { - SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet(); + SplitLogCounters.tot_mgr_node_delete_result.increment(); if (rc != 0) { if (needAbandonRetries(rc, "Delete znode " + path)) { details.getFailedDeletions().add(path); return; } if (rc != KeeperException.Code.NONODE.intValue()) { - SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet(); + SplitLogCounters.tot_mgr_node_delete_err.increment(); Long retry_count = (Long) ctx; LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " + path + " remaining retries=" + retry_count); http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java index 354f581..a2f1799 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java @@ -24,7 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.mutable.MutableInt; @@ -156,7 +156,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements String taskpath = currentTask; if (taskpath != null && taskpath.equals(path)) { LOG.info("retrying data watch on " + path); - SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet(); + SplitLogCounters.tot_wkr_get_data_retry.increment(); getDataSetWatchAsync(); } else { // no point setting a watch on the task which this worker is not @@ -169,7 +169,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements public void getDataSetWatchAsync() { watcher.getRecoverableZooKeeper().getZooKeeper() .getData(currentTask, watcher, new GetDataAsyncCallback(), null); - SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet(); + SplitLogCounters.tot_wkr_get_data_queued.increment(); } void getDataSetWatchSuccess(String path, byte[] data) { @@ -221,12 +221,12 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements try { try { if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) { - SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet(); + SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.increment(); return; } } catch (KeeperException e) { LOG.warn("Failed to get data for znode " + path, e); - SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); + SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment(); return; } SplitLogTask slt; @@ -234,11 +234,11 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements slt = SplitLogTask.parseFrom(data); } catch (DeserializationException e) { LOG.warn("Failed parse data for znode " + path, e); - SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); + SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment(); return; } if (!slt.isUnassigned()) { - SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet(); + SplitLogCounters.tot_wkr_failed_to_grab_task_owned.increment(); return; } @@ -246,7 +246,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements attemptToOwnTask(true, watcher, server.getServerName(), path, slt.getMode(), stat.getVersion()); if (currentVersion < 0) { - SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet(); + SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.increment(); return; } @@ -262,7 +262,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements } LOG.info("worker " + server.getServerName() + " acquired task " + path); - SplitLogCounters.tot_wkr_task_acquired.incrementAndGet(); + SplitLogCounters.tot_wkr_task_acquired.increment(); getDataSetWatchAsync(); submitTask(path, slt.getMode(), currentVersion, reportPeriod); @@ -371,11 +371,11 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion); if (stat == null) { LOG.warn("zk.setData() returned null for path " + task); - SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet(); + SplitLogCounters.tot_wkr_task_heartbeat_failed.increment(); return FAILED_TO_OWN_TASK; } latestZKVersion = stat.getVersion(); - SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet(); + SplitLogCounters.tot_wkr_task_heartbeat.increment(); return latestZKVersion; } catch (KeeperException e) { if (!isFirstTime) { @@ -392,7 +392,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements + StringUtils.stringifyException(e1)); Thread.currentThread().interrupt(); } - SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet(); + SplitLogCounters.tot_wkr_task_heartbeat_failed.increment(); return FAILED_TO_OWN_TASK; } @@ -440,7 +440,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements return; } } - SplitLogCounters.tot_wkr_task_grabing.incrementAndGet(); + SplitLogCounters.tot_wkr_task_grabing.increment(); synchronized (taskReadyLock) { while (seq_start == taskReadySeq.get()) { taskReadyLock.wait(checkInterval); @@ -567,7 +567,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - SplitLogCounters.tot_wkr_get_data_result.incrementAndGet(); + SplitLogCounters.tot_wkr_get_data_result.increment(); if (rc != 0) { LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path); getDataSetWatchFailure(path); @@ -588,14 +588,14 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements * @param ctr */ @Override - public void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails details) { + public void endTask(SplitLogTask slt, LongAdder ctr, SplitTaskDetails details) { ZkSplitTaskDetails zkDetails = (ZkSplitTaskDetails) details; String task = zkDetails.getTaskNode(); int taskZKVersion = zkDetails.getCurTaskZKVersion().intValue(); try { if (ZKUtil.setData(watcher, task, slt.toByteArray(), taskZKVersion)) { LOG.info("successfully transitioned task " + task + " to final state " + slt); - ctr.incrementAndGet(); + ctr.increment(); return; } LOG.warn("failed to transistion task " + task + " to end state " + slt @@ -609,7 +609,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements } catch (KeeperException e) { LOG.warn("failed to end task, " + task + " " + slt, e); } - SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet(); + SplitLogCounters.tot_wkr_final_transition_failed.increment(); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 56996a4..c887d0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.io.hfile; import java.lang.ref.WeakReference; -import java.nio.ByteBuffer; import java.util.EnumMap; import java.util.Iterator; import java.util.List; @@ -32,6 +31,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; @@ -178,13 +178,13 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { private final AtomicLong size; /** Current size of data blocks */ - private final AtomicLong dataBlockSize; + private final LongAdder dataBlockSize; /** Current number of cached elements */ private final AtomicLong elements; /** Current number of cached data block elements */ - private final AtomicLong dataBlockElements; + private final LongAdder dataBlockElements; /** Cache access count (sequential ID) */ private final AtomicLong count; @@ -321,8 +321,8 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { this.stats = new CacheStats(this.getClass().getSimpleName()); this.count = new AtomicLong(0); this.elements = new AtomicLong(0); - this.dataBlockElements = new AtomicLong(0); - this.dataBlockSize = new AtomicLong(0); + this.dataBlockElements = new LongAdder(); + this.dataBlockSize = new LongAdder(); this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel); this.size = new AtomicLong(this.overhead); this.hardCapacityLimitFactor = hardLimitFactor; @@ -409,7 +409,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { map.put(cacheKey, cb); long val = elements.incrementAndGet(); if (buf.getBlockType().isData()) { - dataBlockElements.incrementAndGet(); + dataBlockElements.increment(); } if (LOG.isTraceEnabled()) { long size = map.size(); @@ -462,7 +462,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { heapsize *= -1; } if (bt != null && bt.isData()) { - dataBlockSize.addAndGet(heapsize); + dataBlockSize.add(heapsize); } return size.addAndGet(heapsize); } @@ -569,7 +569,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { assertCounterSanity(size, val); } if (block.getBuffer().getBlockType().isData()) { - dataBlockElements.decrementAndGet(); + dataBlockElements.decrement(); } if (evictedByEvictionProcess) { // When the eviction of the block happened because of invalidation of HFiles, no need to @@ -844,7 +844,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { @Override public long getCurrentDataSize() { - return this.dataBlockSize.get(); + return this.dataBlockSize.sum(); } @Override @@ -864,7 +864,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { @Override public long getDataBlockCount() { - return this.dataBlockElements.get(); + return this.dataBlockElements.sum(); } EvictionThread getEvictionThread() { http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java index b4c5a44..715cd86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java @@ -27,7 +27,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Queue; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.hbase.shaded.com.google.common.collect.MinMaxPriorityQueue; import org.apache.commons.collections4.map.LinkedMap; @@ -347,7 +347,7 @@ public final class BucketAllocator { * @throws BucketAllocatorException */ BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map, - AtomicLong realCacheSize) throws BucketAllocatorException { + LongAdder realCacheSize) throws BucketAllocatorException { this(availableSpace, bucketSizes); // each bucket has an offset, sizeindex. probably the buckets are too big @@ -398,7 +398,7 @@ public final class BucketAllocator { bsi.instantiateBucket(b); reconfigured[bucketNo] = true; } - realCacheSize.addAndGet(foundLen); + realCacheSize.add(foundLen); buckets[bucketNo].addAllocation(foundOffset); usedSize += buckets[bucketNo].getItemAllocationSize(); bucketSizeInfos[bucketSizeIndex].blockAllocated(b); http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index aca8d7f..26a03fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -48,6 +48,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -165,13 +166,13 @@ public class BucketCache implements BlockCache, HeapSize { private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<>(); - private final AtomicLong realCacheSize = new AtomicLong(0); - private final AtomicLong heapSize = new AtomicLong(0); + private final LongAdder realCacheSize = new LongAdder(); + private final LongAdder heapSize = new LongAdder(); /** Current number of cached elements */ - private final AtomicLong blockNumber = new AtomicLong(0); + private final LongAdder blockNumber = new LongAdder(); /** Cache access count (sequential ID) */ - private final AtomicLong accessCount = new AtomicLong(0); + private final AtomicLong accessCount = new AtomicLong(); private static final int DEFAULT_CACHE_WAIT_TIME = 50; // Used in test now. If the flag is false and the cache speed is very fast, @@ -469,8 +470,8 @@ public class BucketCache implements BlockCache, HeapSize { ramCache.remove(cacheKey); cacheStats.failInsert(); } else { - this.blockNumber.incrementAndGet(); - this.heapSize.addAndGet(cachedItem.heapSize()); + this.blockNumber.increment(); + this.heapSize.add(cachedItem.heapSize()); blocksByHFile.add(cacheKey); } } @@ -545,10 +546,10 @@ public class BucketCache implements BlockCache, HeapSize { @VisibleForTesting void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) { bucketAllocator.freeBlock(bucketEntry.offset()); - realCacheSize.addAndGet(-1 * bucketEntry.getLength()); + realCacheSize.add(-1 * bucketEntry.getLength()); blocksByHFile.remove(cacheKey); if (decrementBlockNumber) { - this.blockNumber.decrementAndGet(); + this.blockNumber.decrement(); } } @@ -591,8 +592,8 @@ public class BucketCache implements BlockCache, HeapSize { private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) { RAMQueueEntry removedBlock = ramCache.remove(cacheKey); if (removedBlock != null) { - this.blockNumber.decrementAndGet(); - this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize()); + this.blockNumber.decrement(); + this.heapSize.add(-1 * removedBlock.getData().heapSize()); } return removedBlock; } @@ -689,7 +690,7 @@ public class BucketCache implements BlockCache, HeapSize { } public long getRealCacheSize() { - return this.realCacheSize.get(); + return this.realCacheSize.sum(); } private long acceptableSize() { @@ -791,7 +792,7 @@ public class BucketCache implements BlockCache, HeapSize { if (LOG.isDebugEnabled() && msgBuffer != null) { LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() + " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" + - StringUtils.byteDesc(realCacheSize.get()) + ", total=" + StringUtils.byteDesc(totalSize)); + StringUtils.byteDesc(realCacheSize.sum()) + ", total=" + StringUtils.byteDesc(totalSize)); } long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra @@ -1016,7 +1017,7 @@ public class BucketCache implements BlockCache, HeapSize { // Always remove from ramCache even if we failed adding it to the block cache above. RAMQueueEntry ramCacheEntry = ramCache.remove(key); if (ramCacheEntry != null) { - heapSize.addAndGet(-1 * entries.get(i).getData().heapSize()); + heapSize.add(-1 * entries.get(i).getData().heapSize()); } else if (bucketEntries[i] != null){ // Block should have already been evicted. Remove it and free space. ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset()); @@ -1195,12 +1196,12 @@ public class BucketCache implements BlockCache, HeapSize { @Override public long heapSize() { - return this.heapSize.get(); + return this.heapSize.sum(); } @Override public long size() { - return this.realCacheSize.get(); + return this.realCacheSize.sum(); } @Override @@ -1215,7 +1216,7 @@ public class BucketCache implements BlockCache, HeapSize { @Override public long getBlockCount() { - return this.blockNumber.get(); + return this.blockNumber.sum(); } @Override @@ -1438,7 +1439,7 @@ public class BucketCache implements BlockCache, HeapSize { public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator bucketAllocator, final UniqueIndexMap<Integer> deserialiserMap, - final AtomicLong realCacheSize) throws CacheFullException, IOException, + final LongAdder realCacheSize) throws CacheFullException, IOException, BucketAllocatorException { int len = data.getSerializedLength(); // This cacheable thing can't be serialized @@ -1468,7 +1469,7 @@ public class BucketCache implements BlockCache, HeapSize { throw ioe; } - realCacheSize.addAndGet(len); + realCacheSize.add(len); return bucketEntry; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java index 82b8f1b..a21b48f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java @@ -24,7 +24,7 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -52,8 +52,8 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> { private int maxCapacity; // metrics (shared across all queues) - private AtomicLong numGeneralCallsDropped; - private AtomicLong numLifoModeSwitches; + private LongAdder numGeneralCallsDropped; + private LongAdder numLifoModeSwitches; // Both are in milliseconds private volatile int codelTargetDelay; @@ -76,7 +76,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> { private AtomicBoolean isOverloaded = new AtomicBoolean(false); public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval, - double lifoThreshold, AtomicLong numGeneralCallsDropped, AtomicLong numLifoModeSwitches) { + double lifoThreshold, LongAdder numGeneralCallsDropped, LongAdder numLifoModeSwitches) { this.maxCapacity = capacity; this.queue = new LinkedBlockingDeque<>(capacity); this.codelTargetDelay = targetDelay; @@ -112,13 +112,13 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> { CallRunner cr; while(true) { if (((double) queue.size() / this.maxCapacity) > lifoThreshold) { - numLifoModeSwitches.incrementAndGet(); + numLifoModeSwitches.increment(); cr = queue.takeLast(); } else { cr = queue.takeFirst(); } if (needToDrop(cr)) { - numGeneralCallsDropped.incrementAndGet(); + numGeneralCallsDropped.increment(); cr.drop(); } else { return cr; @@ -135,7 +135,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> { // Only count once per switch. if (!switched) { switched = true; - numLifoModeSwitches.incrementAndGet(); + numLifoModeSwitches.increment(); } cr = queue.pollLast(); } else { @@ -146,7 +146,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> { return cr; } if (needToDrop(cr)) { - numGeneralCallsDropped.incrementAndGet(); + numGeneralCallsDropped.increment(); cr.drop(); } else { return cr; http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 7e35fe8..2eca304 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -248,7 +248,7 @@ public class SplitLogManager { logDirs + " for serverName=" + serverNames); FileStatus[] logfiles = getFileList(logDirs, filter); status.setStatus("Checking directory contents..."); - SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet(); + SplitLogCounters.tot_mgr_log_split_batch_start.increment(); LOG.info("Started splitting " + logfiles.length + " logs in " + logDirs + " for " + serverNames); long t = EnvironmentEdgeManager.currentTime(); @@ -278,7 +278,7 @@ public class SplitLogManager { if (batch.done != batch.installed) { batch.isDead = true; - SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet(); + SplitLogCounters.tot_mgr_log_split_batch_err.increment(); LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed + " but only " + batch.done + " done"); String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch; @@ -302,7 +302,7 @@ public class SplitLogManager { LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); } } - SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet(); + SplitLogCounters.tot_mgr_log_split_batch_success.increment(); } String msg = "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed @@ -474,7 +474,7 @@ public class SplitLogManager { } while (oldtask.status == FAILURE) { LOG.debug("wait for status of task " + path + " to change to DELETED"); - SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet(); + SplitLogCounters.tot_mgr_wait_for_zk_delete.increment(); try { oldtask.wait(); } catch (InterruptedException e) { @@ -694,7 +694,7 @@ public class SplitLogManager { } found_assigned_task = true; if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) { - SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet(); + SplitLogCounters.tot_mgr_resubmit_dead_server_task.increment(); if (getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) { resubmitted++; } else { @@ -741,7 +741,7 @@ public class SplitLogManager { } } getSplitLogManagerCoordination().checkTasks(); - SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet(); + SplitLogCounters.tot_mgr_resubmit_unassigned.increment(); LOG.debug("resubmitting unassigned task(s) after timeout"); } Set<String> failedDeletions = http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java index 308e216..4309dd5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java @@ -28,6 +28,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; @@ -76,9 +77,9 @@ public class MobFileCache { // caches access count private final AtomicLong count = new AtomicLong(0); private long lastAccess = 0; - private final AtomicLong miss = new AtomicLong(0); + private final LongAdder miss = new LongAdder(); private long lastMiss = 0; - private final AtomicLong evictedFileCount = new AtomicLong(0); + private final LongAdder evictedFileCount = new LongAdder(); private long lastEvictedFileCount = 0; // a lock to sync the evict to guarantee the eviction occurs in sequence. @@ -163,7 +164,7 @@ public class MobFileCache { for (CachedMobFile evictedFile : evictedFiles) { closeFile(evictedFile); } - evictedFileCount.addAndGet(evictedFiles.size()); + evictedFileCount.add(evictedFiles.size()); } } @@ -180,7 +181,7 @@ public class MobFileCache { CachedMobFile evictedFile = map.remove(fileName); if (evictedFile != null) { evictedFile.close(); - evictedFileCount.incrementAndGet(); + evictedFileCount.increment(); } } catch (IOException e) { LOG.error("Failed to evict the file " + fileName, e); @@ -219,7 +220,7 @@ public class MobFileCache { cached = CachedMobFile.create(fs, path, conf, cacheConf); cached.open(); map.put(fileName, cached); - miss.incrementAndGet(); + miss.increment(); } } cached.open(); @@ -294,7 +295,7 @@ public class MobFileCache { * @return The count of misses to the mob file cache. */ public long getMissCount() { - return miss.get(); + return miss.sum(); } /** @@ -302,7 +303,7 @@ public class MobFileCache { * @return The number of items evicted from the mob file cache. */ public long getEvictedFileCount() { - return evictedFileCount.get(); + return evictedFileCount.sum(); } /** @@ -310,7 +311,7 @@ public class MobFileCache { * @return The hit ratio to the mob file cache. */ public double getHitRatio() { - return count.get() == 0 ? 0 : ((float) (count.get() - miss.get())) / (float) count.get(); + return count.get() == 0 ? 0 : ((float) (count.get() - miss.sum())) / (float) count.get(); } /** @@ -318,8 +319,8 @@ public class MobFileCache { */ public void printStatistics() { long access = count.get() - lastAccess; - long missed = miss.get() - lastMiss; - long evicted = evictedFileCount.get() - lastEvictedFileCount; + long missed = miss.sum() - lastMiss; + long evicted = evictedFileCount.sum() - lastEvictedFileCount; int hitRatio = access == 0 ? 0 : (int) (((float) (access - missed)) / (float) access * 100); LOG.info("MobFileCache Statistics, access: " + access + ", miss: " + missed + ", hit: " + (access - missed) + ", hit ratio: " + hitRatio + "%, evicted files: " + evicted); http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java index e818426..8d40796 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -29,6 +28,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -220,7 +220,7 @@ public class ChunkCreator { /** Statistics thread */ private static final int statThreadPeriod = 60 * 5; private final AtomicLong chunkCount = new AtomicLong(); - private final AtomicLong reusedChunkCount = new AtomicLong(); + private final LongAdder reusedChunkCount = new LongAdder(); MemStoreChunkPool(int maxCount, int initialCount, float poolSizePercentage) { this.maxCount = maxCount; @@ -254,7 +254,7 @@ public class ChunkCreator { Chunk chunk = reclaimedChunks.poll(); if (chunk != null) { chunk.reset(); - reusedChunkCount.incrementAndGet(); + reusedChunkCount.increment(); } else { // Make a chunk iff we have not yet created the maxCount chunks while (true) { @@ -303,7 +303,7 @@ public class ChunkCreator { private void logStats() { if (!LOG.isDebugEnabled()) return; long created = chunkCount.get(); - long reused = reusedChunkCount.get(); + long reused = reusedChunkCount.sum(); long total = created + reused; LOG.debug("Stats: current pool size=" + reclaimedChunks.size() + ",created chunk count=" + created http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index bff8b7f..401c0d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -281,12 +281,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final LongAdder blockedRequestsCount = new LongAdder(); // Compaction LongAdders - final AtomicLong compactionsFinished = new AtomicLong(0L); - final AtomicLong compactionsFailed = new AtomicLong(0L); - final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L); - final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L); - final AtomicLong compactionsQueued = new AtomicLong(0L); - final AtomicLong flushesQueued = new AtomicLong(0L); + final LongAdder compactionsFinished = new LongAdder(); + final LongAdder compactionsFailed = new LongAdder(); + final LongAdder compactionNumFilesCompacted = new LongAdder(); + final LongAdder compactionNumBytesCompacted = new LongAdder(); + final LongAdder compactionsQueued = new LongAdder(); + final LongAdder flushesQueued = new LongAdder(); private final WAL wal; private final HRegionFileSystem fs; @@ -2272,7 +2272,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } if(fs.isFlushSucceeded()) { - flushesQueued.set(0L); + flushesQueued.reset(); } status.markComplete("Flush successful"); @@ -8100,27 +8100,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet(); // metrics - compactionsFinished.incrementAndGet(); - compactionNumFilesCompacted.addAndGet(numFiles); - compactionNumBytesCompacted.addAndGet(filesSizeCompacted); + compactionsFinished.increment(); + compactionNumFilesCompacted.add(numFiles); + compactionNumBytesCompacted.add(filesSizeCompacted); assert newValue >= 0; } public void reportCompactionRequestFailure() { - compactionsFailed.incrementAndGet(); + compactionsFailed.increment(); } public void incrementCompactionsQueuedCount() { - compactionsQueued.incrementAndGet(); + compactionsQueued.increment(); } public void decrementCompactionsQueuedCount() { - compactionsQueued.decrementAndGet(); + compactionsQueued.decrement(); } public void incrementFlushesQueuedCount() { - flushesQueued.incrementAndGet(); + flushesQueued.increment(); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java index 75585f5..40e268d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java @@ -134,17 +134,17 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable @Override public long getNumFilesCompacted() { - return this.region.compactionNumFilesCompacted.get(); + return this.region.compactionNumFilesCompacted.sum(); } @Override public long getNumBytesCompacted() { - return this.region.compactionNumBytesCompacted.get(); + return this.region.compactionNumBytesCompacted.sum(); } @Override public long getNumCompactionsCompleted() { - return this.region.compactionsFinished.get(); + return this.region.compactionsFinished.sum(); } @Override @@ -161,17 +161,17 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable @Override public long getNumCompactionsFailed() { - return this.region.compactionsFailed.get(); + return this.region.compactionsFailed.sum(); } @Override public long getNumCompactionsQueued() { - return this.region.compactionsQueued.get(); + return this.region.compactionsQueued.sum(); } @Override public long getNumFlushesQueued() { - return this.region.flushesQueued.get(); + return this.region.flushesQueued.sum(); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java index a41a731..18a8e25 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import java.lang.management.MemoryType; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -38,11 +38,11 @@ import org.apache.hadoop.hbase.util.Pair; public class RegionServerAccounting { // memstore data size - private final AtomicLong globalMemstoreDataSize = new AtomicLong(0); + private final LongAdder globalMemstoreDataSize = new LongAdder(); // memstore heap size. When off heap MSLAB in place, this will be only heap overhead of the Cell // POJOs and entry overhead of them onto memstore. When on heap MSLAB, this will be include heap // overhead as well as the cell data size. Ya cell data is in on heap area only then. - private final AtomicLong globalMemstoreHeapSize = new AtomicLong(0); + private final LongAdder globalMemstoreHeapSize = new LongAdder(); // Store the edits size during replaying WAL. Use this to roll back the // global memstore size once a region opening failed. @@ -115,14 +115,14 @@ public class RegionServerAccounting { * @return the global Memstore data size in the RegionServer */ public long getGlobalMemstoreDataSize() { - return globalMemstoreDataSize.get(); + return globalMemstoreDataSize.sum(); } /** * @return the global memstore heap size in the RegionServer */ public long getGlobalMemstoreHeapSize() { - return this.globalMemstoreHeapSize.get(); + return this.globalMemstoreHeapSize.sum(); } /** @@ -130,13 +130,13 @@ public class RegionServerAccounting { * the global Memstore size */ public void incGlobalMemstoreSize(MemstoreSize memStoreSize) { - globalMemstoreDataSize.addAndGet(memStoreSize.getDataSize()); - globalMemstoreHeapSize.addAndGet(memStoreSize.getHeapSize()); + globalMemstoreDataSize.add(memStoreSize.getDataSize()); + globalMemstoreHeapSize.add(memStoreSize.getHeapSize()); } public void decGlobalMemstoreSize(MemstoreSize memStoreSize) { - globalMemstoreDataSize.addAndGet(-memStoreSize.getDataSize()); - globalMemstoreHeapSize.addAndGet(-memStoreSize.getHeapSize()); + globalMemstoreDataSize.add(-memStoreSize.getDataSize()); + globalMemstoreHeapSize.add(-memStoreSize.getHeapSize()); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java index 8ad150b..b204fb6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java @@ -76,7 +76,7 @@ public class WALSplitterHandler extends EventHandler { SplitLogCounters.tot_wkr_task_done, splitTaskDetails); break; case PREEMPTED: - SplitLogCounters.tot_wkr_preempt_task.incrementAndGet(); + SplitLogCounters.tot_wkr_preempt_task.increment(); LOG.warn("task execution preempted " + splitTaskDetails.getWALFile()); break; case ERR: http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java index 56517a4..875acd7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java @@ -45,6 +45,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -250,24 +251,24 @@ public final class Canary implements Tool { public static class RegionStdOutSink extends StdOutSink { - private Map<String, AtomicLong> perTableReadLatency = new HashMap<>(); - private AtomicLong writeLatency = new AtomicLong(); + private Map<String, LongAdder> perTableReadLatency = new HashMap<>(); + private LongAdder writeLatency = new LongAdder(); - public Map<String, AtomicLong> getReadLatencyMap() { + public Map<String, LongAdder> getReadLatencyMap() { return this.perTableReadLatency; } - public AtomicLong initializeAndGetReadLatencyForTable(String tableName) { - AtomicLong initLatency = new AtomicLong(0L); + public LongAdder initializeAndGetReadLatencyForTable(String tableName) { + LongAdder initLatency = new LongAdder(); this.perTableReadLatency.put(tableName, initLatency); return initLatency; } public void initializeWriteLatency() { - this.writeLatency.set(0L); + this.writeLatency.reset(); } - public AtomicLong getWriteLatency() { + public LongAdder getWriteLatency() { return this.writeLatency; } } @@ -323,10 +324,10 @@ public final class Canary implements Tool { private TaskType taskType; private boolean rawScanEnabled; private ServerName serverName; - private AtomicLong readWriteLatency; + private LongAdder readWriteLatency; RegionTask(Connection connection, HRegionInfo region, ServerName serverName, RegionStdOutSink sink, - TaskType taskType, boolean rawScanEnabled, AtomicLong rwLatency) { + TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency) { this.connection = connection; this.region = region; this.serverName = serverName; @@ -414,7 +415,7 @@ public final class Canary implements Tool { rs.next(); } stopWatch.stop(); - this.readWriteLatency.addAndGet(stopWatch.getTime()); + this.readWriteLatency.add(stopWatch.getTime()); sink.publishReadTiming(serverName, region, column, stopWatch.getTime()); } catch (Exception e) { sink.publishReadFailure(serverName, region, column, e); @@ -466,7 +467,7 @@ public final class Canary implements Tool { long startTime = System.currentTimeMillis(); table.put(put); long time = System.currentTimeMillis() - startTime; - this.readWriteLatency.addAndGet(time); + this.readWriteLatency.add(time); sink.publishWriteTiming(serverName, region, column, time); } catch (Exception e) { sink.publishWriteFailure(serverName, region, column, e); @@ -1049,7 +1050,7 @@ public final class Canary implements Tool { } this.initialized = true; for (String table : tables) { - AtomicLong readLatency = regionSink.initializeAndGetReadLatencyForTable(table); + LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table); taskFutures.addAll(Canary.sniff(admin, regionSink, table, executor, TaskType.READ, this.rawScanEnabled, readLatency)); } @@ -1068,7 +1069,7 @@ public final class Canary implements Tool { } // sniff canary table with write operation regionSink.initializeWriteLatency(); - AtomicLong writeTableLatency = regionSink.getWriteLatency(); + LongAdder writeTableLatency = regionSink.getWriteLatency(); taskFutures.addAll(Canary.sniff(admin, regionSink, admin.getTableDescriptor(writeTableName), executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency)); } @@ -1080,7 +1081,7 @@ public final class Canary implements Tool { LOG.error("Sniff region failed!", e); } } - Map<String, AtomicLong> actualReadTableLatency = regionSink.getReadLatencyMap(); + Map<String, LongAdder> actualReadTableLatency = regionSink.getReadLatencyMap(); for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) { String tableName = entry.getKey(); if (actualReadTableLatency.containsKey(tableName)) { @@ -1167,7 +1168,7 @@ public final class Canary implements Tool { for (HTableDescriptor table : admin.listTables()) { if (admin.isTableEnabled(table.getTableName()) && (!table.getTableName().equals(writeTableName))) { - AtomicLong readLatency = regionSink.initializeAndGetReadLatencyForTable(table.getNameAsString()); + LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table.getNameAsString()); taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType, this.rawScanEnabled, readLatency)); } } @@ -1235,7 +1236,7 @@ public final class Canary implements Tool { * @throws Exception */ private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName, - ExecutorService executor, TaskType taskType, boolean rawScanEnabled, AtomicLong readLatency) throws Exception { + ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug(String.format("checking table is enabled and getting table descriptor for table %s", tableName)); @@ -1254,7 +1255,7 @@ public final class Canary implements Tool { */ private static List<Future<Void>> sniff(final Admin admin, final Sink sink, HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType, - boolean rawScanEnabled, AtomicLong rwLatency) throws Exception { + boolean rawScanEnabled, LongAdder rwLatency) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug(String.format("reading list of regions for table %s", tableDesc.getTableName())); http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java index 4f6ffd2..e789b4f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; @@ -139,7 +140,7 @@ public class TestBucketWriterThread { RAMQueueEntry spiedRqe = Mockito.spy(rqe); Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe). writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(), - (UniqueIndexMap<Integer>)Mockito.any(), (AtomicLong)Mockito.any()); + (UniqueIndexMap<Integer>)Mockito.any(), (LongAdder) Mockito.any()); this.q.add(spiedRqe); doDrainOfOneEntry(bc, wt, q); // Cache disabled when ioes w/o ever healing. @@ -162,7 +163,7 @@ public class TestBucketWriterThread { Mockito.doThrow(cfe). doReturn(mockedBucketEntry). when(spiedRqe).writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(), - (UniqueIndexMap<Integer>)Mockito.any(), (AtomicLong)Mockito.any()); + (UniqueIndexMap<Integer>)Mockito.any(), (LongAdder) Mockito.any()); this.q.add(spiedRqe); doDrainOfOneEntry(bc, wt, q); } http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 7df3086..b1d7e22 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -45,6 +45,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -1049,15 +1050,15 @@ public class TestDistributedLogSplitting { long waitTime = 80000; long endt = curt + waitTime; while (curt < endt) { - if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() + - tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() + - tot_wkr_preempt_task.get()) == 0) { + if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() + + tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() + + tot_wkr_preempt_task.sum()) == 0) { Thread.yield(); curt = System.currentTimeMillis(); } else { - assertTrue(1 <= (tot_wkr_task_resigned.get() + tot_wkr_task_err.get() + - tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() + - tot_wkr_preempt_task.get())); + assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() + + tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() + + tot_wkr_preempt_task.sum())); return; } } @@ -1717,16 +1718,16 @@ public class TestDistributedLogSplitting { } } - private void waitForCounter(AtomicLong ctr, long oldval, long newval, + private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems) { long curt = System.currentTimeMillis(); long endt = curt + timems; while (curt < endt) { - if (ctr.get() == oldval) { + if (ctr.sum() == oldval) { Thread.yield(); curt = System.currentTimeMillis(); } else { - assertEquals(newval, ctr.get()); + assertEquals(newval, ctr.sum()); return; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index 4c7bc54..6fd3e8d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -171,12 +172,12 @@ public class TestSplitLogManager { long eval(); } - private void waitForCounter(final AtomicLong ctr, long oldval, long newval, long timems) + private void waitForCounter(final LongAdder ctr, long oldval, long newval, long timems) throws Exception { Expr e = new Expr() { @Override public long eval() { - return ctr.get(); + return ctr.sum(); } }; waitForCounter(e, oldval, newval, timems); @@ -199,7 +200,7 @@ public class TestSplitLogManager { private Task findOrCreateOrphanTask(String path) { return slm.tasks.computeIfAbsent(path, k -> { LOG.info("creating orphan task " + k); - SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet(); + SplitLogCounters.tot_mgr_orphan_task_acquired.increment(); return new Task(); }); } @@ -214,7 +215,7 @@ public class TestSplitLogManager { slm.enqueueSplitTask(name, batch); assertEquals(1, batch.installed); assertTrue(findOrCreateOrphanTask(tasknode).batch == batch); - assertEquals(1L, tot_mgr_node_create_queued.get()); + assertEquals(1L, tot_mgr_node_create_queued.sum()); LOG.debug("waiting for task node creation"); listener.waitForCreation(); @@ -286,7 +287,7 @@ public class TestSplitLogManager { Task task2 = findOrCreateOrphanTask(tasknode); assertTrue(task == task2); LOG.debug("task = " + task); - assertEquals(1L, tot_mgr_resubmit.get()); + assertEquals(1L, tot_mgr_resubmit.sum()); assertEquals(1, task.incarnation.get()); assertEquals(0, task.unforcedResubmits.get()); assertTrue(task.isOrphan()); @@ -323,7 +324,7 @@ public class TestSplitLogManager { waitForCounter(tot_mgr_heartbeat, 2, 3, to/2); waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2); Thread.sleep(to + to/2); - assertEquals(2L, tot_mgr_resubmit.get() - tot_mgr_resubmit_force.get()); + assertEquals(2L, tot_mgr_resubmit.sum() - tot_mgr_resubmit_force.sum()); } @Test (timeout=180000) @@ -342,10 +343,10 @@ public class TestSplitLogManager { waitForCounter(new Expr() { @Override public long eval() { - return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get()); + return (tot_mgr_resubmit.sum() + tot_mgr_resubmit_failed.sum()); } }, 0, 1, 5*60000); // wait long enough - Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.get()); + Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.sum()); int version1 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version1 > version); byte[] taskstate = ZKUtil.getData(zkw, tasknode); @@ -400,23 +401,23 @@ public class TestSplitLogManager { @Test (timeout=180000) public void testTaskResigned() throws Exception { LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); - assertEquals(tot_mgr_resubmit.get(), 0); + assertEquals(tot_mgr_resubmit.sum(), 0); slm = new SplitLogManager(master, conf); - assertEquals(tot_mgr_resubmit.get(), 0); + assertEquals(tot_mgr_resubmit.sum(), 0); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); - assertEquals(tot_mgr_resubmit.get(), 0); + assertEquals(tot_mgr_resubmit.sum(), 0); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); - assertEquals(tot_mgr_resubmit.get(), 0); + assertEquals(tot_mgr_resubmit.sum(), 0); SplitLogTask slt = new SplitLogTask.Resigned(worker1, this.mode); - assertEquals(tot_mgr_resubmit.get(), 0); + assertEquals(tot_mgr_resubmit.sum(), 0); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); ZKUtil.checkExists(zkw, tasknode); // Could be small race here. - if (tot_mgr_resubmit.get() == 0) { + if (tot_mgr_resubmit.sum() == 0) { waitForCounter(tot_mgr_resubmit, 0, 1, to/2); } - assertEquals(tot_mgr_resubmit.get(), 1); + assertEquals(tot_mgr_resubmit.sum(), 1); byte[] taskstate = ZKUtil.getData(zkw, tasknode); slt = SplitLogTask.parseFrom(taskstate); @@ -472,10 +473,10 @@ public class TestSplitLogManager { final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); - if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); + if (tot_mgr_heartbeat.sum() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); slm.handleDeadWorker(worker1); - if (tot_mgr_resubmit.get() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2); - if (tot_mgr_resubmit_dead_server_task.get() == 0) { + if (tot_mgr_resubmit.sum() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2); + if (tot_mgr_resubmit_dead_server_task.sum() == 0) { waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2); } @@ -497,10 +498,10 @@ public class TestSplitLogManager { SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); - if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); + if (tot_mgr_heartbeat.sum() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); // Not yet resubmitted. - Assert.assertEquals(0, tot_mgr_resubmit.get()); + Assert.assertEquals(0, tot_mgr_resubmit.sum()); // This server becomes dead Mockito.when(sm.isServerOnline(worker1)).thenReturn(false); @@ -508,7 +509,7 @@ public class TestSplitLogManager { Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded). // It has been resubmitted - Assert.assertEquals(1, tot_mgr_resubmit.get()); + Assert.assertEquals(1, tot_mgr_resubmit.sum()); } @Test (timeout=180000) http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java index 18df013..1d2b038 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -27,7 +27,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.List; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -151,32 +151,32 @@ public class TestSplitLogWorker { } } - private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems) + private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems) throws Exception { - assertTrue("ctr=" + ctr.get() + ", oldval=" + oldval + ", newval=" + newval, + assertTrue("ctr=" + ctr.sum() + ", oldval=" + oldval + ", newval=" + newval, waitForCounterBoolean(ctr, oldval, newval, timems)); } - private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, long newval, + private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, long newval, long timems) throws Exception { return waitForCounterBoolean(ctr, oldval, newval, timems, true); } - private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, final long newval, + private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, final long newval, long timems, boolean failIfTimeout) throws Exception { long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { - return (ctr.get() >= newval); + return (ctr.sum() >= newval); } }); if( timeWaited > 0) { // when not timed out - assertEquals(newval, ctr.get()); + assertEquals(newval, ctr.sum()); } return true; } @@ -293,7 +293,7 @@ public class TestSplitLogWorker { // not it, that we fell through to the next counter in line and it was set. assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, WAIT_TIME, false) || - SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1); + SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.sum() == 1); byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT)); SplitLogTask slt = SplitLogTask.parseFrom(bytes); assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2));