This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new f35c5ea  HBASE-24718 : Generic NamedQueue framework for multiple 
use-cases (Refactor SlowLog responses) (#2109)
f35c5ea is described below

commit f35c5eaaddf2a7b247e3ea81c31bfa51afce35fc
Author: Viraj Jasani <vjas...@apache.org>
AuthorDate: Tue Jul 21 20:24:13 2020 +0530

    HBASE-24718 : Generic NamedQueue framework for multiple use-cases (Refactor 
SlowLog responses) (#2109)
    
    Closes #2052
    
    Signed-off-by: Wellington Chevreuil <wchevre...@apache.org>
---
 .../hbase/shaded/protobuf/RequestConverter.java    |   5 +
 hbase-common/src/main/resources/hbase-default.xml  |  12 +
 .../src/main/protobuf/server/region/Admin.proto    |   6 +
 .../org/apache/hadoop/hbase/ipc/RpcServer.java     |  28 +--
 .../hadoop/hbase/ipc/RpcServerInterface.java       |  10 +-
 .../DisruptorExceptionHandler.java                 |   2 +-
 .../hadoop/hbase/namequeues/LogEventHandler.java   | 130 ++++++++++
 .../slowlog => namequeues}/LogHandlerUtils.java    |   6 +-
 .../NamedQueuePayload.java}                        |  33 ++-
 .../NamedQueueRecorder.java}                       | 120 ++++------
 .../hadoop/hbase/namequeues/NamedQueueService.java |  69 ++++++
 .../slowlog => namequeues}/RingBufferEnvelope.java |  22 +-
 .../slowlog => namequeues}/RpcLogDetails.java      |   5 +-
 .../hbase/namequeues/SlowLogPersistentService.java |  98 ++++++++
 .../SlowLogTableOpsChore.java                      |  12 +-
 .../hbase/namequeues/impl/SlowLogQueueService.java | 264 ++++++++++++++++++++
 .../namequeues/request/NamedQueueGetRequest.java   |  65 +++++
 .../namequeues/response/NamedQueueGetResponse.java |  61 +++++
 .../hadoop/hbase/regionserver/HRegionServer.java   |  23 +-
 .../hadoop/hbase/regionserver/RSRpcServices.java   |  52 ++--
 .../regionserver/slowlog/LogEventHandler.java      | 266 ---------------------
 .../TestNamedQueueRecorder.java}                   | 216 ++++++++++-------
 .../TestSlowLogAccessor.java                       |  56 +++--
 hbase-shell/src/test/ruby/hbase/admin_test.rb      |   2 +-
 .../thrift2/TestThriftHBaseServiceHandler.java     |   1 +
 25 files changed, 1027 insertions(+), 537 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 2be7ccd..7b0282a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -1817,6 +1817,11 @@ public final class RequestConverter {
     } else {
       builder.setFilterByOperator(SlowLogResponseRequest.FilterByOperator.OR);
     }
+    if (LogQueryFilter.Type.SLOW_LOG.equals(logQueryFilter.getType())) {
+      builder.setLogType(SlowLogResponseRequest.LogType.SLOW_LOG);
+    } else {
+      builder.setLogType(SlowLogResponseRequest.LogType.LARGE_LOG);
+    }
     return builder.setLimit(logQueryFilter.getLimit()).build();
   }
 
diff --git a/hbase-common/src/main/resources/hbase-default.xml 
b/hbase-common/src/main/resources/hbase-default.xml
index f50ed1c..25bfc87 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1992,4 +1992,16 @@ possible configurations would overwhelm and obscure the 
important.
       too large batch request.
     </description>
   </property>
+  <property>
+    <name>hbase.namedqueue.provider.classes</name>
+    <value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService</value>
+    <description>
+      Default values for NamedQueueService implementors. This comma separated 
full class names
+      represent all implementors of NamedQueueService that we would like to be 
invoked by
+      LogEvent handler service. One example of NamedQueue service is 
SlowLogQueueService which
+      is used to store slow/large RPC logs in ringbuffer at each RegionServer.
+      All implementors of NamedQueueService should be found under package:
+      "org.apache.hadoop.hbase.namequeues.impl"
+    </description>
+  </property>
 </configuration>
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto 
b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
index b8cfcde..101ed1e 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
@@ -288,12 +288,18 @@ message SlowLogResponseRequest {
     OR = 1;
   }
 
+  enum LogType {
+    SLOW_LOG = 0;
+    LARGE_LOG = 1;
+  }
+
   optional string region_name = 1;
   optional string table_name = 2;
   optional string client_address = 3;
   optional string user_name = 4;
   optional uint32 limit = 5 [default = 10];
   optional FilterByOperator filter_by_operator = 6 [default = OR];
+  optional LogType log_type = 7;
 }
 
 message SlowLogResponses {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 12fd584..cace5f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
@@ -46,8 +47,8 @@ import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
-import org.apache.hadoop.hbase.regionserver.slowlog.RpcLogDetails;
-import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
+import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
+import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
 import org.apache.hadoop.hbase.security.SaslUtil;
 import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
@@ -94,10 +95,9 @@ public abstract class RpcServer implements 
RpcServerInterface,
   private static final String MULTI_GETS = "multi.gets";
   private static final String MULTI_MUTATIONS = "multi.mutations";
   private static final String MULTI_SERVICE_CALLS = "multi.service_calls";
-  private static final String GET_SLOW_LOG_RESPONSES = "GetSlowLogResponses";
-  private static final String CLEAR_SLOW_LOGS_RESPONSES = 
"ClearSlowLogsResponses";
 
   private final boolean authorize;
+  private final boolean isOnlineLogProviderEnabled;
   protected boolean isSecurityEnabled;
 
   public static final byte CURRENT_VERSION = 0;
@@ -229,7 +229,7 @@ public abstract class RpcServer implements 
RpcServerInterface,
   /**
    * Use to add online slowlog responses
    */
-  private SlowLogRecorder slowLogRecorder;
+  private NamedQueueRecorder namedQueueRecorder;
 
   @FunctionalInterface
   protected interface CallCleanup {
@@ -304,6 +304,8 @@ public abstract class RpcServer implements 
RpcServerInterface,
       saslProps = Collections.emptyMap();
     }
 
+    this.isOnlineLogProviderEnabled = 
conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
+      HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
     this.scheduler = scheduler;
   }
 
@@ -432,11 +434,11 @@ public abstract class RpcServer implements 
RpcServerInterface,
           tooLarge, tooSlow,
           status.getClient(), startTime, processingTime, qTime,
           responseSize, userName);
-        if (this.slowLogRecorder != null) {
+        if (this.namedQueueRecorder != null && 
this.isOnlineLogProviderEnabled) {
           // send logs to ring buffer owned by slowLogRecorder
-          final String className = server == null ? StringUtils.EMPTY :
-            server.getClass().getSimpleName();
-          this.slowLogRecorder.addSlowLogPayload(
+          final String className =
+            server == null ? StringUtils.EMPTY : 
server.getClass().getSimpleName();
+          this.namedQueueRecorder.addRecord(
             new RpcLogDetails(call, param, status.getClient(), responseSize, 
className, tooSlow,
               tooLarge));
         }
@@ -819,12 +821,8 @@ public abstract class RpcServer implements 
RpcServerInterface,
   }
 
   @Override
-  public void setSlowLogRecorder(SlowLogRecorder slowLogRecorder) {
-    this.slowLogRecorder = slowLogRecorder;
+  public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) {
+    this.namedQueueRecorder = namedQueueRecorder;
   }
 
-  @Override
-  public SlowLogRecorder getSlowLogRecorder() {
-    return slowLogRecorder;
-  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
index c8a71f3..99e0188 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
-import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
+import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -102,12 +102,8 @@ public interface RpcServerInterface {
   /**
    * Set Online SlowLog Provider
    *
-   * @param slowLogRecorder instance of {@link SlowLogRecorder}
+   * @param namedQueueRecorder instance of {@link NamedQueueRecorder}
    */
-  void setSlowLogRecorder(final SlowLogRecorder slowLogRecorder);
+  void setNamedQueueRecorder(final NamedQueueRecorder namedQueueRecorder);
 
-  /**
-   * @return Retrieve instance of {@link SlowLogRecorder} maintained by 
RpcServer
-   */
-  SlowLogRecorder getSlowLogRecorder();
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/DisruptorExceptionHandler.java
similarity index 96%
copy from 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java
copy to 
hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/DisruptorExceptionHandler.java
index 53a2ef1..fcaecc6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/DisruptorExceptionHandler.java
@@ -17,7 +17,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hbase.regionserver.slowlog;
+package org.apache.hadoop.hbase.namequeues;
 
 import com.lmax.disruptor.ExceptionHandler;
 import org.apache.yetus.audience.InterfaceAudience;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java
new file mode 100644
index 0000000..8b8db2f
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.namequeues;
+
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.RingBuffer;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Event Handler run by disruptor ringbuffer consumer.
+ * Although this is generic implementation for namedQueue, it can have 
individual queue specific
+ * logic.
+ */
+@InterfaceAudience.Private
+class LogEventHandler implements EventHandler<RingBufferEnvelope> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LogEventHandler.class);
+
+  // Map that binds namedQueues to corresponding queue service implementation.
+  // If NamedQueue of specific type is enabled, corresponding service will be 
used to
+  // insert and retrieve records.
+  // Individual queue sizes should be determined based on their individual 
configs within
+  // each service.
+  private final Map<NamedQueuePayload.NamedQueueEvent, NamedQueueService> 
namedQueueServices =
+    new HashMap<>();
+
+  private static final String NAMED_QUEUE_PROVIDER_CLASSES = 
"hbase.namedqueue.provider.classes";
+
+  LogEventHandler(final Configuration conf) {
+    for (String implName : 
conf.getStringCollection(NAMED_QUEUE_PROVIDER_CLASSES)) {
+      Class<?> clz;
+      try {
+        clz = Class.forName(implName);
+      } catch (ClassNotFoundException e) {
+        LOG.warn("Failed to find NamedQueueService implementor class {}", 
implName, e);
+        continue;
+      }
+
+      if (!NamedQueueService.class.isAssignableFrom(clz)) {
+        LOG.warn("Class {} is not implementor of NamedQueueService.", clz);
+        continue;
+      }
+
+      // add all service mappings here
+      try {
+        NamedQueueService namedQueueService =
+          (NamedQueueService) 
clz.getConstructor(Configuration.class).newInstance(conf);
+        namedQueueServices.put(namedQueueService.getEvent(), 
namedQueueService);
+      } catch (InstantiationException | IllegalAccessException | 
NoSuchMethodException
+          | InvocationTargetException e) {
+        LOG.warn("Unable to instantiate/add NamedQueueService implementor {} 
to service map.",
+          clz);
+      }
+    }
+  }
+
+  /**
+   * Called when a publisher has published an event to the {@link RingBuffer}.
+   * This is generic consumer of disruptor ringbuffer and for each new 
namedQueue that we
+   * add, we should also provide specific consumer logic here.
+   *
+   * @param event published to the {@link RingBuffer}
+   * @param sequence of the event being processed
+   * @param endOfBatch flag to indicate if this is the last event in a batch 
from
+   *   the {@link RingBuffer}
+   */
+  @Override
+  public void onEvent(RingBufferEnvelope event, long sequence, boolean 
endOfBatch) {
+    final NamedQueuePayload namedQueuePayload = event.getPayload();
+    // consume ringbuffer payload based on event type
+    namedQueueServices.get(namedQueuePayload.getNamedQueueEvent())
+      .consumeEventFromDisruptor(namedQueuePayload);
+  }
+
+  /**
+   * Cleans up queues maintained by services.
+   *
+   * @param namedQueueEvent type of queue to clear
+   * @return true if queue is cleaned up, false otherwise
+   */
+  boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
+    return namedQueueServices.get(namedQueueEvent).clearNamedQueue();
+  }
+
+  /**
+   * Add all in memory queue records to system table. The implementors can use 
system table
+   * or direct HDFS file or ZK as persistence system.
+   */
+  void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
+    namedQueueServices.get(namedQueueEvent).persistAll();
+  }
+
+  /**
+   * Retrieve in memory queue records from ringbuffer
+   *
+   * @param request namedQueue request with event type
+   * @return queue records from ringbuffer after filter (if applied)
+   */
+  NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
+    return 
namedQueueServices.get(request.getNamedQueueEvent()).getNamedQueueRecords(request);
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogHandlerUtils.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogHandlerUtils.java
similarity index 96%
rename from 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogHandlerUtils.java
rename to 
hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogHandlerUtils.java
index f4d850f..f04cb18 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogHandlerUtils.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogHandlerUtils.java
@@ -17,7 +17,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hbase.regionserver.slowlog;
+package org.apache.hadoop.hbase.namequeues;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -30,7 +30,7 @@ import org.apache.yetus.audience.InterfaceAudience;
  * Event Handler utility class
  */
 @InterfaceAudience.Private
-class LogHandlerUtils {
+public class LogHandlerUtils {
 
   private static int getTotalFiltersCount(AdminProtos.SlowLogResponseRequest 
request) {
     int totalFilters = 0;
@@ -91,7 +91,7 @@ class LogHandlerUtils {
     return filteredSlowLogPayloads;
   }
 
-  static List<TooSlowLog.SlowLogPayload> getFilteredLogs(
+  public static List<TooSlowLog.SlowLogPayload> getFilteredLogs(
       AdminProtos.SlowLogResponseRequest request, 
List<TooSlowLog.SlowLogPayload> logPayloadList) {
     int totalFilters = getTotalFiltersCount(request);
     if (totalFilters > 0) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
similarity index 54%
rename from 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java
rename to 
hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
index 53a2ef1..7aa87fa 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
@@ -17,34 +17,33 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hbase.regionserver.slowlog;
+package org.apache.hadoop.hbase.namequeues;
 
-import com.lmax.disruptor.ExceptionHandler;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * Exception Handler for Online Slow Log Ring Buffer
+ * Base payload to be prepared by client to send various namedQueue events for 
in-memory
+ * ring buffer storage in either HMaster or RegionServer.
+ * e.g slowLog responses
  */
 @InterfaceAudience.Private
-class DisruptorExceptionHandler implements 
ExceptionHandler<RingBufferEnvelope> {
+public class NamedQueuePayload {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(DisruptorExceptionHandler.class);
-
-  @Override
-  public void handleEventException(Throwable e, long sequence, 
RingBufferEnvelope event) {
-    LOG.error("Sequence={}, event={}", sequence, event, e);
+  public enum NamedQueueEvent {
+    SLOW_LOG
   }
 
-  @Override
-  public void handleOnStartException(Throwable e) {
-    LOG.error("Disruptor onStartException: ", e);
+  private final NamedQueueEvent namedQueueEvent;
+
+  public NamedQueuePayload(NamedQueueEvent namedQueueEvent) {
+    if (namedQueueEvent == null) {
+      throw new RuntimeException("NamedQueuePayload with null 
namedQueueEvent");
+    }
+    this.namedQueueEvent = namedQueueEvent;
   }
 
-  @Override
-  public void handleOnShutdownException(Throwable e) {
-    LOG.error("Disruptor onShutdownException: ", e);
+  public NamedQueueEvent getNamedQueueEvent() {
+    return namedQueueEvent;
   }
 
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java
similarity index 50%
rename from 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java
rename to 
hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java
index b0fb3e7..cb3512a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java
@@ -17,87 +17,80 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hbase.regionserver.slowlog;
+package org.apache.hadoop.hbase.namequeues;
 
 import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
 
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
-
 /**
- * Online Slow/Large Log Provider Service that keeps slow/large RPC logs in 
the ring buffer.
- * The service uses LMAX Disruptor to save slow records which are then 
consumed by
+ * NamedQueue recorder that maintains various named queues.
+ * The service uses LMAX Disruptor to save queue records which are then 
consumed by
  * a queue and based on the ring buffer size, the available records are then 
fetched
  * from the queue in thread-safe manner.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class SlowLogRecorder {
+public class NamedQueueRecorder {
 
   private final Disruptor<RingBufferEnvelope> disruptor;
   private final LogEventHandler logEventHandler;
-  private final int eventCount;
-  private final boolean isOnlineLogProviderEnabled;
 
-  private static final String SLOW_LOG_RING_BUFFER_SIZE =
-    "hbase.regionserver.slowlog.ringbuffer.size";
+  private static NamedQueueRecorder namedQueueRecorder;
+  private static boolean isInit = false;
+  private static final Object LOCK = new Object();
 
   /**
    * Initialize disruptor with configurable ringbuffer size
    */
-  public SlowLogRecorder(Configuration conf) {
-    isOnlineLogProviderEnabled = 
conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
-      HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
-
-    if (!isOnlineLogProviderEnabled) {
-      this.disruptor = null;
-      this.logEventHandler = null;
-      this.eventCount = 0;
-      return;
-    }
-
-    this.eventCount = conf.getInt(SLOW_LOG_RING_BUFFER_SIZE,
-      HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE);
+  private NamedQueueRecorder(Configuration conf) {
 
     // This is the 'writer' -- a single threaded executor. This single thread 
consumes what is
     // put on the ringbuffer.
     final String hostingThreadName = Thread.currentThread().getName();
 
+    int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024);
+
     // disruptor initialization with BlockingWaitStrategy
     this.disruptor = new Disruptor<>(RingBufferEnvelope::new,
-      getEventCount(),
+      getEventCount(eventCount),
       Threads.newDaemonThreadFactory(hostingThreadName + ".slowlog.append"),
       ProducerType.MULTI,
       new BlockingWaitStrategy());
     this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
 
     // initialize ringbuffer event handler
-    final boolean isSlowLogTableEnabled = 
conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
-      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
-    this.logEventHandler = new LogEventHandler(this.eventCount, 
isSlowLogTableEnabled, conf);
+    this.logEventHandler = new LogEventHandler(conf);
     this.disruptor.handleEventsWith(new 
LogEventHandler[]{this.logEventHandler});
     this.disruptor.start();
   }
 
+  public static NamedQueueRecorder getInstance(Configuration conf) {
+    if (namedQueueRecorder != null) {
+      return namedQueueRecorder;
+    }
+    synchronized (LOCK) {
+      if (!isInit) {
+        namedQueueRecorder = new NamedQueueRecorder(conf);
+        isInit = true;
+      }
+    }
+    return namedQueueRecorder;
+  }
+
   // must be power of 2 for disruptor ringbuffer
-  private int getEventCount() {
-    Preconditions.checkArgument(eventCount >= 0,
-      SLOW_LOG_RING_BUFFER_SIZE + " must be > 0");
+  private int getEventCount(int eventCount) {
+    Preconditions.checkArgument(eventCount >= 0, 
"hbase.namedqueue.ringbuffer.size must be > 0");
     int floor = Integer.highestOneBit(eventCount);
     if (floor == eventCount) {
       return floor;
@@ -110,66 +103,53 @@ public class SlowLogRecorder {
   }
 
   /**
-   * Retrieve online slow logs from ringbuffer
-   *
-   * @param request slow log request parameters
-   * @return online slow logs from ringbuffer
-   */
-  public List<SlowLogPayload> 
getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
-    return isOnlineLogProviderEnabled ? 
this.logEventHandler.getSlowLogPayloads(request)
-      : Collections.emptyList();
-  }
-
-  /**
-   * Retrieve online large logs from ringbuffer
+   * Retrieve in memory queue records from ringbuffer
    *
-   * @param request large log request parameters
-   * @return online large logs from ringbuffer
+   * @param request namedQueue request with event type
+   * @return queue records from ringbuffer after filter (if applied)
    */
-  public List<SlowLogPayload> 
getLargeLogPayloads(AdminProtos.SlowLogResponseRequest request) {
-    return isOnlineLogProviderEnabled ? 
this.logEventHandler.getLargeLogPayloads(request)
-      : Collections.emptyList();
+  public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest 
request) {
+    return this.logEventHandler.getNamedQueueRecords(request);
   }
 
   /**
-   * clears slow log payloads from ringbuffer
+   * clears queue records from ringbuffer
    *
+   * @param namedQueueEvent type of queue to clear
    * @return true if slow log payloads are cleaned up or
    *   hbase.regionserver.slowlog.buffer.enabled is not set to true, false if 
failed to
    *   clean up slow logs
    */
-  public boolean clearSlowLogPayloads() {
-    if (!isOnlineLogProviderEnabled) {
-      return true;
-    }
-    return this.logEventHandler.clearSlowLogs();
+  public boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent 
namedQueueEvent) {
+    return this.logEventHandler.clearNamedQueue(namedQueueEvent);
   }
 
   /**
-   * Add slow log rpcCall details to ringbuffer
+   * Add various NamedQueue records to ringbuffer. Based on the type of the 
event (e.g slowLog),
+   * consumer of disruptor ringbuffer will have specific logic.
+   * This method is producer of disruptor ringbuffer which is initialized in 
NamedQueueRecorder
+   * constructor.
    *
-   * @param rpcLogDetails all details of rpc call that would be useful for 
ring buffer
-   *   consumers
+   * @param namedQueuePayload namedQueue payload sent by client of ring buffer
+   *   service
    */
-  public void addSlowLogPayload(RpcLogDetails rpcLogDetails) {
-    if (!isOnlineLogProviderEnabled) {
-      return;
-    }
+  public void addRecord(NamedQueuePayload namedQueuePayload) {
     RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer();
     long seqId = ringBuffer.next();
     try {
-      ringBuffer.get(seqId).load(rpcLogDetails);
+      ringBuffer.get(seqId).load(namedQueuePayload);
     } finally {
       ringBuffer.publish(seqId);
     }
   }
 
   /**
-   * Poll from queueForSysTable and insert 100 records in hbase:slowlog table 
in single batch
+   * Add all in memory queue records to system table. The implementors can use 
system table
+   * or direct HDFS file or ZK as persistence system.
    */
-  public void addAllLogsToSysTable() {
+  public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
     if (this.logEventHandler != null) {
-      this.logEventHandler.addAllLogsToSysTable();
+      this.logEventHandler.persistAll(namedQueueEvent);
     }
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java
new file mode 100644
index 0000000..84c1b24
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.namequeues;
+
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * In-memory Queue service provider for multiple use-cases. Implementers 
should be
+ * registered in LogEventHandler
+ */
+@InterfaceAudience.Private
+public interface NamedQueueService {
+
+  /**
+   * Retrieve event type for NamedQueueService implementation.
+   *
+   * @return {@link NamedQueuePayload.NamedQueueEvent}
+   */
+  NamedQueuePayload.NamedQueueEvent getEvent();
+
+  /**
+   * This implementation is generic for consuming records from LMAX
+   * disruptor and inserts records to EvictingQueue which is maintained by each
+   * ringbuffer provider.
+   *
+   * @param namedQueuePayload namedQueue payload from disruptor ring buffer
+   */
+  void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload);
+
+  /**
+   * Cleans up queues maintained by services.
+   *
+   * @return true if slow log payloads are cleaned up, false otherwise
+   */
+  boolean clearNamedQueue();
+
+  /**
+   * Retrieve in memory queue records from ringbuffer
+   *
+   * @param request namedQueue request with event type
+   * @return queue records from ringbuffer after filter (if applied)
+   */
+  NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request);
+
+  /**
+   * Add all in memory queue records to system table. The implementors can use 
system table
+   * or direct HDFS file or ZK as persistence system.
+   */
+  void persistAll();
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java
similarity index 68%
rename from 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java
rename to 
hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java
index d308670..f93baaa 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java
@@ -17,29 +17,29 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hbase.regionserver.slowlog;
+package org.apache.hadoop.hbase.namequeues;
 
 import org.apache.hadoop.hbase.ipc.RpcCall;
 import org.apache.yetus.audience.InterfaceAudience;
 
 
 /**
- * An envelope to carry payload in the slow log ring buffer that serves as 
online buffer
- * to provide latest TooSlowLog
+ * An envelope to carry payload in the ring buffer that serves as online buffer
+ * to provide latest events
  */
 @InterfaceAudience.Private
 final class RingBufferEnvelope {
 
-  private RpcLogDetails rpcLogDetails;
+  private NamedQueuePayload namedQueuePayload;
 
   /**
    * Load the Envelope with {@link RpcCall}
    *
-   * @param rpcLogDetails all details of rpc call that would be useful for 
ring buffer
+   * @param namedQueuePayload all details of rpc call that would be useful for 
ring buffer
    *   consumers
    */
-  public void load(RpcLogDetails rpcLogDetails) {
-    this.rpcLogDetails = rpcLogDetails;
+  public void load(NamedQueuePayload namedQueuePayload) {
+    this.namedQueuePayload = namedQueuePayload;
   }
 
   /**
@@ -48,10 +48,10 @@ final class RingBufferEnvelope {
    *
    * @return Retrieve rpc log details
    */
-  public RpcLogDetails getPayload() {
-    final RpcLogDetails rpcLogDetails = this.rpcLogDetails;
-    this.rpcLogDetails = null;
-    return rpcLogDetails;
+  public NamedQueuePayload getPayload() {
+    final NamedQueuePayload namedQueuePayload = this.namedQueuePayload;
+    this.namedQueuePayload = null;
+    return namedQueuePayload;
   }
 
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java
similarity index 94%
rename from 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java
rename to 
hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java
index b469cdb..581d1a3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java
@@ -17,7 +17,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hbase.regionserver.slowlog;
+package org.apache.hadoop.hbase.namequeues;
 
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.hadoop.hbase.ipc.RpcCall;
@@ -28,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience;
  * RpcCall details that would be passed on to ring buffer of slow log responses
  */
 @InterfaceAudience.Private
-public class RpcLogDetails {
+public class RpcLogDetails extends NamedQueuePayload {
 
   private final RpcCall rpcCall;
   private final Message param;
@@ -40,6 +40,7 @@ public class RpcLogDetails {
 
   public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, 
long responseSize,
       String className, boolean isSlowLog, boolean isLargeLog) {
+    super(NamedQueueEvent.SLOW_LOG);
     this.rpcCall = rpcCall;
     this.param = param;
     this.clientAddress = clientAddress;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java
new file mode 100644
index 0000000..2c701ff
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.namequeues;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
+import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
+import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
+import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Persistent service provider for Slow/LargeLog events
+ */
+@InterfaceAudience.Private
+public class SlowLogPersistentService {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SlowLogPersistentService.class);
+
+  private static final ReentrantLock LOCK = new ReentrantLock();
+  private static final String SYS_TABLE_QUEUE_SIZE =
+    "hbase.regionserver.slowlog.systable.queue.size";
+  private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000;
+  private static final int SYSTABLE_PUT_BATCH_SIZE = 100;
+
+  private final Queue<TooSlowLog.SlowLogPayload> queueForSysTable;
+
+  private final Configuration configuration;
+
+  public SlowLogPersistentService(final Configuration configuration) {
+    this.configuration = configuration;
+    int sysTableQueueSize =
+      configuration.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE);
+    EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueueForTable =
+      EvictingQueue.create(sysTableQueueSize);
+    queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable);
+  }
+
+  public void addToQueueForSysTable(TooSlowLog.SlowLogPayload slowLogPayload) {
+    queueForSysTable.add(slowLogPayload);
+  }
+
+  /**
+   * Poll from queueForSysTable and insert 100 records in hbase:slowlog table 
in single batch
+   */
+  public void addAllLogsToSysTable() {
+    if (queueForSysTable == null) {
+      LOG.trace("hbase.regionserver.slowlog.systable.enabled is turned off. 
Exiting.");
+      return;
+    }
+    if (LOCK.isLocked()) {
+      return;
+    }
+    LOCK.lock();
+    try {
+      List<TooSlowLog.SlowLogPayload> slowLogPayloads = new ArrayList<>();
+      int i = 0;
+      while (!queueForSysTable.isEmpty()) {
+        slowLogPayloads.add(queueForSysTable.poll());
+        i++;
+        if (i == SYSTABLE_PUT_BATCH_SIZE) {
+          SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, 
this.configuration);
+          slowLogPayloads.clear();
+          i = 0;
+        }
+      }
+      if (slowLogPayloads.size() > 0) {
+        SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, 
this.configuration);
+      }
+    } finally {
+      LOCK.unlock();
+    }
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java
similarity index 84%
rename from 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java
rename to 
hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java
index 77749f7..bc892e3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java
@@ -17,7 +17,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hbase.regionserver.slowlog;
+package org.apache.hadoop.hbase.namequeues;
 
 import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.Stoppable;
@@ -33,7 +33,7 @@ public class SlowLogTableOpsChore extends ScheduledChore {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(SlowLogTableOpsChore.class);
 
-  private final SlowLogRecorder slowLogRecorder;
+  private final NamedQueueRecorder namedQueueRecorder;
 
   /**
    * Chore Constructor
@@ -41,12 +41,12 @@ public class SlowLogTableOpsChore extends ScheduledChore {
    * @param stopper The stopper - When {@link Stoppable#isStopped()} is true, 
this chore will
    *   cancel and cleanup
    * @param period Period in millis with which this Chore repeats execution 
when scheduled
-   * @param slowLogRecorder {@link SlowLogRecorder} instance
+   * @param namedQueueRecorder {@link NamedQueueRecorder} instance
    */
   public SlowLogTableOpsChore(final Stoppable stopper, final int period,
-      final SlowLogRecorder slowLogRecorder) {
+      final NamedQueueRecorder namedQueueRecorder) {
     super("SlowLogTableOpsChore", stopper, period);
-    this.slowLogRecorder = slowLogRecorder;
+    this.namedQueueRecorder = namedQueueRecorder;
   }
 
   @Override
@@ -54,7 +54,7 @@ public class SlowLogTableOpsChore extends ScheduledChore {
     if (LOG.isTraceEnabled()) {
       LOG.trace("SlowLog Table Ops Chore is starting up.");
     }
-    slowLogRecorder.addAllLogsToSysTable();
+    namedQueueRecorder.persistAll(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
     if (LOG.isTraceEnabled()) {
       LOG.trace("SlowLog Table Ops Chore is closing.");
     }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java
new file mode 100644
index 0000000..f26ff51
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java
@@ -0,0 +1,264 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.namequeues.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.SlowLogParams;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.namequeues.LogHandlerUtils;
+import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
+import org.apache.hadoop.hbase.namequeues.NamedQueueService;
+import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
+import org.apache.hadoop.hbase.namequeues.SlowLogPersistentService;
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
+import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
+import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.stream.Collectors;
+
+/**
+ * In-memory Queue service provider for Slow/LargeLog events
+ */
+@InterfaceAudience.Private
+public class SlowLogQueueService implements NamedQueueService {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SlowLogQueueService.class);
+
+  private static final String SLOW_LOG_RING_BUFFER_SIZE =
+    "hbase.regionserver.slowlog.ringbuffer.size";
+
+  private final boolean isOnlineLogProviderEnabled;
+  private final boolean isSlowLogTableEnabled;
+  private final SlowLogPersistentService slowLogPersistentService;
+  private final Queue<TooSlowLog.SlowLogPayload> slowLogQueue;
+
+  public SlowLogQueueService(Configuration conf) {
+    this.isOnlineLogProviderEnabled = 
conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
+      HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
+
+    if (!isOnlineLogProviderEnabled) {
+      this.isSlowLogTableEnabled = false;
+      this.slowLogPersistentService = null;
+      this.slowLogQueue = null;
+      return;
+    }
+
+    // Initialize SlowLog Queue
+    int slowLogQueueSize =
+      conf.getInt(SLOW_LOG_RING_BUFFER_SIZE, 
HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE);
+
+    EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueue =
+      EvictingQueue.create(slowLogQueueSize);
+    slowLogQueue = Queues.synchronizedQueue(evictingQueue);
+
+    this.isSlowLogTableEnabled = 
conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
+      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
+    if (isSlowLogTableEnabled) {
+      slowLogPersistentService = new SlowLogPersistentService(conf);
+    } else {
+      slowLogPersistentService = null;
+    }
+  }
+
+  @Override
+  public NamedQueuePayload.NamedQueueEvent getEvent() {
+    return NamedQueuePayload.NamedQueueEvent.SLOW_LOG;
+  }
+
+  /**
+   * This implementation is specific to slowLog event. This consumes slowLog 
event from
+   * disruptor and inserts records to EvictingQueue.
+   *
+   * @param namedQueuePayload namedQueue payload from disruptor ring buffer
+   */
+  @Override
+  public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
+    if (!isOnlineLogProviderEnabled) {
+      return;
+    }
+    if (!(namedQueuePayload instanceof RpcLogDetails)) {
+      LOG.warn("SlowLogQueueService: NamedQueuePayload is not of type 
RpcLogDetails.");
+      return;
+    }
+    final RpcLogDetails rpcLogDetails = (RpcLogDetails) namedQueuePayload;
+    final RpcCall rpcCall = rpcLogDetails.getRpcCall();
+    final String clientAddress = rpcLogDetails.getClientAddress();
+    final long responseSize = rpcLogDetails.getResponseSize();
+    final String className = rpcLogDetails.getClassName();
+    final TooSlowLog.SlowLogPayload.Type type = getLogType(rpcLogDetails);
+    if (type == null) {
+      return;
+    }
+    Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
+    Message param = rpcLogDetails.getParam();
+    long receiveTime = rpcCall.getReceiveTime();
+    long startTime = rpcCall.getStartTime();
+    long endTime = System.currentTimeMillis();
+    int processingTime = (int) (endTime - startTime);
+    int qTime = (int) (startTime - receiveTime);
+    final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param);
+    int numGets = 0;
+    int numMutations = 0;
+    int numServiceCalls = 0;
+    if (param instanceof ClientProtos.MultiRequest) {
+      ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
+      for (ClientProtos.RegionAction regionAction : 
multi.getRegionActionList()) {
+        for (ClientProtos.Action action : regionAction.getActionList()) {
+          if (action.hasMutation()) {
+            numMutations++;
+          }
+          if (action.hasGet()) {
+            numGets++;
+          }
+          if (action.hasServiceCall()) {
+            numServiceCalls++;
+          }
+        }
+      }
+    }
+    final String userName = 
rpcCall.getRequestUserName().orElse(StringUtils.EMPTY);
+    final String methodDescriptorName =
+      methodDescriptor != null ? methodDescriptor.getName() : 
StringUtils.EMPTY;
+    TooSlowLog.SlowLogPayload slowLogPayload = 
TooSlowLog.SlowLogPayload.newBuilder()
+      .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() 
+ ")")
+      .setClientAddress(clientAddress)
+      .setMethodName(methodDescriptorName)
+      .setMultiGets(numGets)
+      .setMultiMutations(numMutations)
+      .setMultiServiceCalls(numServiceCalls)
+      .setParam(slowLogParams != null ? slowLogParams.getParams() : 
StringUtils.EMPTY)
+      .setProcessingTime(processingTime)
+      .setQueueTime(qTime)
+      .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : 
StringUtils.EMPTY)
+      .setResponseSize(responseSize)
+      .setServerClass(className)
+      .setStartTime(startTime)
+      .setType(type)
+      .setUserName(userName)
+      .build();
+    slowLogQueue.add(slowLogPayload);
+    if (isSlowLogTableEnabled) {
+      if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {
+        slowLogPersistentService.addToQueueForSysTable(slowLogPayload);
+      }
+    }
+  }
+
+  @Override
+  public boolean clearNamedQueue() {
+    if (!isOnlineLogProviderEnabled) {
+      return false;
+    }
+    LOG.debug("Received request to clean up online slowlog buffer.");
+    slowLogQueue.clear();
+    return true;
+  }
+
+  @Override
+  public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest 
request) {
+    if (!isOnlineLogProviderEnabled) {
+      return null;
+    }
+    final AdminProtos.SlowLogResponseRequest slowLogResponseRequest =
+      request.getSlowLogResponseRequest();
+    final List<TooSlowLog.SlowLogPayload> slowLogPayloads;
+    if (AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG
+        .equals(slowLogResponseRequest.getLogType())) {
+      slowLogPayloads = getLargeLogPayloads(slowLogResponseRequest);
+    } else {
+      slowLogPayloads = getSlowLogPayloads(slowLogResponseRequest);
+    }
+    NamedQueueGetResponse response = new NamedQueueGetResponse();
+    response.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
+    response.setSlowLogPayloads(slowLogPayloads);
+    return response;
+  }
+
+  private TooSlowLog.SlowLogPayload.Type getLogType(RpcLogDetails 
rpcCallDetails) {
+    final boolean isSlowLog = rpcCallDetails.isSlowLog();
+    final boolean isLargeLog = rpcCallDetails.isLargeLog();
+    final TooSlowLog.SlowLogPayload.Type type;
+    if (!isSlowLog && !isLargeLog) {
+      LOG.error("slowLog and largeLog both are false. Ignoring the event. 
rpcCallDetails: {}",
+        rpcCallDetails);
+      return null;
+    }
+    if (isSlowLog && isLargeLog) {
+      type = TooSlowLog.SlowLogPayload.Type.ALL;
+    } else if (isSlowLog) {
+      type = TooSlowLog.SlowLogPayload.Type.SLOW_LOG;
+    } else {
+      type = TooSlowLog.SlowLogPayload.Type.LARGE_LOG;
+    }
+    return type;
+  }
+
+  /**
+   * Add all slowLog events to system table. This is only for slowLog event's 
persistence on
+   * system table.
+   */
+  @Override
+  public void persistAll() {
+    if (!isOnlineLogProviderEnabled) {
+      return;
+    }
+    if (slowLogPersistentService != null) {
+      slowLogPersistentService.addAllLogsToSysTable();
+    }
+  }
+
+  private List<TooSlowLog.SlowLogPayload> getSlowLogPayloads(
+      final AdminProtos.SlowLogResponseRequest request) {
+    List<TooSlowLog.SlowLogPayload> slowLogPayloadList =
+      Arrays.stream(slowLogQueue.toArray(new 
TooSlowLog.SlowLogPayload[0])).filter(
+        e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL
+          || e.getType() == 
TooSlowLog.SlowLogPayload.Type.SLOW_LOG).collect(Collectors.toList());
+    // latest slow logs first, operator is interested in latest records from 
in-memory buffer
+    Collections.reverse(slowLogPayloadList);
+    return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
+  }
+
+  private List<TooSlowLog.SlowLogPayload> getLargeLogPayloads(
+      final AdminProtos.SlowLogResponseRequest request) {
+    List<TooSlowLog.SlowLogPayload> slowLogPayloadList =
+      Arrays.stream(slowLogQueue.toArray(new 
TooSlowLog.SlowLogPayload[0])).filter(
+        e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL
+          || e.getType() == 
TooSlowLog.SlowLogPayload.Type.LARGE_LOG).collect(Collectors.toList());
+    // latest large logs first, operator is interested in latest records from 
in-memory buffer
+    Collections.reverse(slowLogPayloadList);
+    return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java
new file mode 100644
index 0000000..6e88bf4
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.namequeues.request;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Request object to be used by ring buffer use-cases. Clients get records by 
sending
+ * this request object.
+ * For each ring buffer use-case, add request payload to this class, client 
should set
+ * namedQueueEvent based on use-case.
+ * Protobuf does not support inheritance, hence we need to work with
+ */
+@InterfaceAudience.Private
+public class NamedQueueGetRequest {
+
+  private AdminProtos.SlowLogResponseRequest slowLogResponseRequest;
+  private NamedQueuePayload.NamedQueueEvent namedQueueEvent;
+
+  public AdminProtos.SlowLogResponseRequest getSlowLogResponseRequest() {
+    return slowLogResponseRequest;
+  }
+
+  public void setSlowLogResponseRequest(
+      AdminProtos.SlowLogResponseRequest slowLogResponseRequest) {
+    this.slowLogResponseRequest = slowLogResponseRequest;
+  }
+
+  public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() {
+    return namedQueueEvent;
+  }
+
+  public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent 
namedQueueEvent) {
+    this.namedQueueEvent = namedQueueEvent;
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+      .append("slowLogResponseRequest", slowLogResponseRequest)
+      .append("namedQueueEvent", namedQueueEvent)
+      .toString();
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java
new file mode 100644
index 0000000..ee4ed43
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.namequeues.response;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
+import org.apache.yetus.audience.InterfaceAudience;
+import java.util.List;
+
+/**
+ * Response object to be sent by namedQueue service back to caller
+ */
+@InterfaceAudience.Private
+public class NamedQueueGetResponse {
+
+  private List<TooSlowLog.SlowLogPayload> slowLogPayloads;
+  private NamedQueuePayload.NamedQueueEvent namedQueueEvent;
+
+  public List<TooSlowLog.SlowLogPayload> getSlowLogPayloads() {
+    return slowLogPayloads;
+  }
+
+  public void setSlowLogPayloads(List<TooSlowLog.SlowLogPayload> 
slowLogPayloads) {
+    this.slowLogPayloads = slowLogPayloads;
+  }
+
+  public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() {
+    return namedQueueEvent;
+  }
+
+  public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent 
namedQueueEvent) {
+    this.namedQueueEvent = namedQueueEvent;
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+      .append("slowLogPayloads", slowLogPayloads)
+      .append("namedQueueEvent", namedQueueEvent)
+      .toString();
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 99fddf6..2df4d83 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -138,8 +138,8 @@ import 
org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
 import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
-import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
-import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogTableOpsChore;
+import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
+import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
 import 
org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
@@ -536,7 +536,7 @@ public class HRegionServer extends Thread implements
   /**
    * Provide online slow log responses from ringbuffer
    */
-  private SlowLogRecorder slowLogRecorder;
+  private NamedQueueRecorder namedQueueRecorder = null;
 
   /**
    * True if this RegionServer is coming up in a cluster where there is no 
Master;
@@ -595,7 +595,12 @@ public class HRegionServer extends Thread implements
       this.stopped = false;
 
       if (!(this instanceof HMaster)) {
-        this.slowLogRecorder = new SlowLogRecorder(this.conf);
+        final boolean isOnlineLogProviderEnabled = conf.getBoolean(
+          HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
+          HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
+        if (isOnlineLogProviderEnabled) {
+          this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
+        }
       }
       rpcServices = createRpcServices();
       useThisHostnameInstead = getUseThisHostnameInstead(conf);
@@ -1494,12 +1499,12 @@ public class HRegionServer extends Thread implements
   }
 
   /**
-   * get Online SlowLog Provider to add slow logs to ringbuffer
+   * get NamedQueue Provider to add different logs to ringbuffer
    *
-   * @return Online SlowLog Provider
+   * @return NamedQueueRecorder
    */
-  public SlowLogRecorder getSlowLogRecorder() {
-    return this.slowLogRecorder;
+  public NamedQueueRecorder getNamedQueueRecorder() {
+    return this.namedQueueRecorder;
   }
 
   /*
@@ -2091,7 +2096,7 @@ public class HRegionServer extends Thread implements
     if (isSlowLogTableEnabled) {
       // default chore duration: 10 min
       final int duration = 
conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
-      slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, 
this.slowLogRecorder);
+      slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, 
this.namedQueueRecorder);
     }
 
     // Create the thread to clean the moved regions list
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 56de741..d49bf4c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -109,6 +109,9 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MasterRpcServices;
+import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
 import org.apache.hadoop.hbase.net.Address;
 import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
 import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
@@ -129,7 +132,7 @@ import 
org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
 import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
-import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
+import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import 
org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker;
 import 
org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker;
@@ -1305,7 +1308,7 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
     rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name);
     rpcServer.setRsRpcServices(this);
     if (!(rs instanceof HMaster)) {
-      rpcServer.setSlowLogRecorder(rs.getSlowLogRecorder());
+      rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder());
     }
     scannerLeaseTimeoutPeriod = conf.getInt(
       HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
@@ -4071,28 +4074,39 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
   @QosPriority(priority = HConstants.ADMIN_QOS)
   public SlowLogResponses getSlowLogResponses(final RpcController controller,
     final SlowLogResponseRequest request) {
-    final SlowLogRecorder slowLogRecorder =
-      this.regionServer.getSlowLogRecorder();
-    final List<SlowLogPayload> slowLogPayloads;
-    slowLogPayloads = slowLogRecorder != null
-      ? slowLogRecorder.getSlowLogPayloads(request)
-      : Collections.emptyList();
+    final NamedQueueRecorder namedQueueRecorder =
+      this.regionServer.getNamedQueueRecorder();
+    final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, 
namedQueueRecorder);
     SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
       .addAllSlowLogPayloads(slowLogPayloads)
       .build();
     return slowLogResponses;
   }
 
+  private List<SlowLogPayload> getSlowLogPayloads(SlowLogResponseRequest 
request,
+      NamedQueueRecorder namedQueueRecorder) {
+    if (namedQueueRecorder == null) {
+      return Collections.emptyList();
+    }
+    List<SlowLogPayload> slowLogPayloads;
+    NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
+    
namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
+    namedQueueGetRequest.setSlowLogResponseRequest(request);
+    NamedQueueGetResponse namedQueueGetResponse =
+      namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
+    slowLogPayloads = namedQueueGetResponse != null ?
+      namedQueueGetResponse.getSlowLogPayloads() :
+      Collections.emptyList();
+    return slowLogPayloads;
+  }
+
   @Override
   @QosPriority(priority = HConstants.ADMIN_QOS)
   public SlowLogResponses getLargeLogResponses(final RpcController controller,
       final SlowLogResponseRequest request) {
-    final SlowLogRecorder slowLogRecorder =
-      this.regionServer.getSlowLogRecorder();
-    final List<SlowLogPayload> slowLogPayloads;
-    slowLogPayloads = slowLogRecorder != null
-      ? slowLogRecorder.getLargeLogPayloads(request)
-      : Collections.emptyList();
+    final NamedQueueRecorder namedQueueRecorder =
+      this.regionServer.getNamedQueueRecorder();
+    final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, 
namedQueueRecorder);
     SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
       .addAllSlowLogPayloads(slowLogPayloads)
       .build();
@@ -4103,10 +4117,12 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
   @QosPriority(priority = HConstants.ADMIN_QOS)
   public ClearSlowLogResponses clearSlowLogsResponses(final RpcController 
controller,
     final ClearSlowLogResponseRequest request) {
-    final SlowLogRecorder slowLogRecorder =
-      this.regionServer.getSlowLogRecorder();
-    boolean slowLogsCleaned = Optional.ofNullable(slowLogRecorder)
-      .map(SlowLogRecorder::clearSlowLogPayloads).orElse(false);
+    final NamedQueueRecorder namedQueueRecorder =
+      this.regionServer.getNamedQueueRecorder();
+    boolean slowLogsCleaned = Optional.ofNullable(namedQueueRecorder)
+      .map(queueRecorder ->
+        
queueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG))
+      .orElse(false);
     ClearSlowLogResponses clearSlowLogResponses = 
ClearSlowLogResponses.newBuilder()
       .setIsCleaned(slowLogsCleaned)
       .build();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java
deleted file mode 100644
index 9c147e3..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver.slowlog;
-
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.RingBuffer;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.SlowLogParams;
-import org.apache.hadoop.hbase.ipc.RpcCall;
-import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
-import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
-import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
-import org.apache.hbase.thirdparty.com.google.protobuf.Message;
-
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
-
-/**
- * Event Handler run by disruptor ringbuffer consumer
- */
-@InterfaceAudience.Private
-class LogEventHandler implements EventHandler<RingBufferEnvelope> {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(LogEventHandler.class);
-
-  private static final String SYS_TABLE_QUEUE_SIZE =
-    "hbase.regionserver.slowlog.systable.queue.size";
-  private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000;
-  private static final int SYSTABLE_PUT_BATCH_SIZE = 100;
-
-  private final Queue<SlowLogPayload> queueForRingBuffer;
-  private final Queue<SlowLogPayload> queueForSysTable;
-  private final boolean isSlowLogTableEnabled;
-
-  private Configuration configuration;
-
-  private static final ReentrantLock LOCK = new ReentrantLock();
-
-  LogEventHandler(int eventCount, boolean isSlowLogTableEnabled, Configuration 
conf) {
-    this.configuration = conf;
-    EvictingQueue<SlowLogPayload> evictingQueue = 
EvictingQueue.create(eventCount);
-    queueForRingBuffer = Queues.synchronizedQueue(evictingQueue);
-    this.isSlowLogTableEnabled = isSlowLogTableEnabled;
-    if (isSlowLogTableEnabled) {
-      int sysTableQueueSize = conf.getInt(SYS_TABLE_QUEUE_SIZE, 
DEFAULT_SYS_TABLE_QUEUE_SIZE);
-      EvictingQueue<SlowLogPayload> evictingQueueForTable =
-        EvictingQueue.create(sysTableQueueSize);
-      queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable);
-    } else {
-      queueForSysTable = null;
-    }
-  }
-
-  /**
-   * Called when a publisher has published an event to the {@link RingBuffer}
-   *
-   * @param event published to the {@link RingBuffer}
-   * @param sequence of the event being processed
-   * @param endOfBatch flag to indicate if this is the last event in a batch 
from
-   *   the {@link RingBuffer}
-   * @throws Exception if the EventHandler would like the exception handled 
further up the chain
-   */
-  @Override
-  public void onEvent(RingBufferEnvelope event, long sequence, boolean 
endOfBatch)
-      throws Exception {
-    final RpcLogDetails rpcCallDetails = event.getPayload();
-    final RpcCall rpcCall = rpcCallDetails.getRpcCall();
-    final String clientAddress = rpcCallDetails.getClientAddress();
-    final long responseSize = rpcCallDetails.getResponseSize();
-    final String className = rpcCallDetails.getClassName();
-    final SlowLogPayload.Type type = getLogType(rpcCallDetails);
-    if (type == null) {
-      return;
-    }
-    Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
-    Message param = rpcCallDetails.getParam();
-    long receiveTime = rpcCall.getReceiveTime();
-    long startTime = rpcCall.getStartTime();
-    long endTime = System.currentTimeMillis();
-    int processingTime = (int) (endTime - startTime);
-    int qTime = (int) (startTime - receiveTime);
-    final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param);
-    int numGets = 0;
-    int numMutations = 0;
-    int numServiceCalls = 0;
-    if (param instanceof ClientProtos.MultiRequest) {
-      ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
-      for (ClientProtos.RegionAction regionAction : 
multi.getRegionActionList()) {
-        for (ClientProtos.Action action : regionAction.getActionList()) {
-          if (action.hasMutation()) {
-            numMutations++;
-          }
-          if (action.hasGet()) {
-            numGets++;
-          }
-          if (action.hasServiceCall()) {
-            numServiceCalls++;
-          }
-        }
-      }
-    }
-    final String userName = 
rpcCall.getRequestUserName().orElse(StringUtils.EMPTY);
-    final String methodDescriptorName =
-      methodDescriptor != null ? methodDescriptor.getName() : 
StringUtils.EMPTY;
-    SlowLogPayload slowLogPayload = SlowLogPayload.newBuilder()
-      .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() 
+ ")")
-      .setClientAddress(clientAddress)
-      .setMethodName(methodDescriptorName)
-      .setMultiGets(numGets)
-      .setMultiMutations(numMutations)
-      .setMultiServiceCalls(numServiceCalls)
-      .setParam(slowLogParams != null ? slowLogParams.getParams() : 
StringUtils.EMPTY)
-      .setProcessingTime(processingTime)
-      .setQueueTime(qTime)
-      .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : 
StringUtils.EMPTY)
-      .setResponseSize(responseSize)
-      .setServerClass(className)
-      .setStartTime(startTime)
-      .setType(type)
-      .setUserName(userName)
-      .build();
-    queueForRingBuffer.add(slowLogPayload);
-    if (isSlowLogTableEnabled) {
-      if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {
-        queueForSysTable.add(slowLogPayload);
-      }
-    }
-  }
-
-  private SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) {
-    final boolean isSlowLog = rpcCallDetails.isSlowLog();
-    final boolean isLargeLog = rpcCallDetails.isLargeLog();
-    final SlowLogPayload.Type type;
-    if (!isSlowLog && !isLargeLog) {
-      LOG.error("slowLog and largeLog both are false. Ignoring the event. 
rpcCallDetails: {}",
-        rpcCallDetails);
-      return null;
-    }
-    if (isSlowLog && isLargeLog) {
-      type = SlowLogPayload.Type.ALL;
-    } else if (isSlowLog) {
-      type = SlowLogPayload.Type.SLOW_LOG;
-    } else {
-      type = SlowLogPayload.Type.LARGE_LOG;
-    }
-    return type;
-  }
-
-  /**
-   * Cleans up slow log payloads
-   *
-   * @return true if slow log payloads are cleaned up, false otherwise
-   */
-  boolean clearSlowLogs() {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Received request to clean up online slowlog buffer..");
-    }
-    queueForRingBuffer.clear();
-    return true;
-  }
-
-  /**
-   * Retrieve list of slow log payloads
-   *
-   * @param request slow log request parameters
-   * @return list of slow log payloads
-   */
-  List<SlowLogPayload> getSlowLogPayloads(final 
AdminProtos.SlowLogResponseRequest request) {
-    List<SlowLogPayload> slowLogPayloadList =
-      Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0]))
-        .filter(e -> e.getType() == SlowLogPayload.Type.ALL
-          || e.getType() == SlowLogPayload.Type.SLOW_LOG)
-        .collect(Collectors.toList());
-
-    // latest slow logs first, operator is interested in latest records from 
in-memory buffer
-    Collections.reverse(slowLogPayloadList);
-
-    return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
-  }
-
-  /**
-   * Retrieve list of large log payloads
-   *
-   * @param request large log request parameters
-   * @return list of large log payloads
-   */
-  List<SlowLogPayload> getLargeLogPayloads(final 
AdminProtos.SlowLogResponseRequest request) {
-    List<SlowLogPayload> slowLogPayloadList =
-      Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0]))
-        .filter(e -> e.getType() == SlowLogPayload.Type.ALL
-          || e.getType() == SlowLogPayload.Type.LARGE_LOG)
-        .collect(Collectors.toList());
-
-    // latest large logs first, operator is interested in latest records from 
in-memory buffer
-    Collections.reverse(slowLogPayloadList);
-
-    return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
-  }
-
-  /**
-   * Poll from queueForSysTable and insert 100 records in hbase:slowlog table 
in single batch
-   */
-  void addAllLogsToSysTable() {
-    if (queueForSysTable == null) {
-      // hbase.regionserver.slowlog.systable.enabled is turned off. Exiting.
-      return;
-    }
-    if (LOCK.isLocked()) {
-      return;
-    }
-    LOCK.lock();
-    try {
-      List<SlowLogPayload> slowLogPayloads = new ArrayList<>();
-      int i = 0;
-      while (!queueForSysTable.isEmpty()) {
-        slowLogPayloads.add(queueForSysTable.poll());
-        i++;
-        if (i == SYSTABLE_PUT_BATCH_SIZE) {
-          SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, 
this.configuration);
-          slowLogPayloads.clear();
-          i = 0;
-        }
-      }
-      if (slowLogPayloads.size() > 0) {
-        SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, 
this.configuration);
-      }
-    } finally {
-      LOCK.unlock();
-    }
-  }
-
-}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
similarity index 76%
rename from 
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java
rename to 
hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
index f90bbc0..542efc3 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
@@ -17,12 +17,14 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hbase.regionserver.slowlog;
+package org.apache.hadoop.hbase.namequeues;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.net.InetAddress;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -35,6 +37,8 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ipc.RpcCall;
 import org.apache.hadoop.hbase.ipc.RpcCallback;
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -61,27 +65,25 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPaylo
  * Tests for Online SlowLog Provider Service
  */
 @Category({MasterTests.class, MediumTests.class})
-public class TestSlowLogRecorder {
+public class TestNamedQueueRecorder {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestSlowLogRecorder.class);
+    HBaseClassTestRule.forClass(TestNamedQueueRecorder.class);
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestSlowLogRecorder.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestNamedQueueRecorder.class);
 
   private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new 
HBaseTestingUtility();
 
-  private SlowLogRecorder slowLogRecorder;
+  private NamedQueueRecorder namedQueueRecorder;
 
   private static int i = 0;
 
   private static Configuration applySlowLogRecorderConf(int eventSize) {
-
     Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
     conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
     conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize);
     return conf;
-
   }
 
   /**
@@ -90,11 +92,10 @@ public class TestSlowLogRecorder {
    *
    * @param i index of ringbuffer logs
    * @param j data value that was put on index i
-   * @param slowLogPayloads list of payload retrieved from {@link 
SlowLogRecorder}
+   * @param slowLogPayloads list of payload retrieved from {@link 
NamedQueueRecorder}
    * @return if actual values are as per expectations
    */
   private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> 
slowLogPayloads) {
-
     boolean isClientExpected = 
slowLogPayloads.get(i).getClientAddress().equals("client_" + j);
     boolean isUserExpected = 
slowLogPayloads.get(i).getUserName().equals("userName_" + j);
     boolean isClassExpected = 
slowLogPayloads.get(i).getServerClass().equals("class_" + j);
@@ -102,15 +103,18 @@ public class TestSlowLogRecorder {
   }
 
   @Test
-  public void testOnlieSlowLogConsumption() throws Exception {
+  public void testOnlieSlowLogConsumption() throws Exception{
 
     Configuration conf = applySlowLogRecorderConf(8);
-    slowLogRecorder = new SlowLogRecorder(conf);
+    Constructor<NamedQueueRecorder> constructor =
+      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
+    constructor.setAccessible(true);
+    namedQueueRecorder = constructor.newInstance(conf);
     AdminProtos.SlowLogResponseRequest request =
       AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
 
-    slowLogRecorder.clearSlowLogPayloads();
-    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+    
namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
+    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
     LOG.debug("Initially ringbuffer of Slow Log records is empty");
 
     int i = 0;
@@ -119,12 +123,12 @@ public class TestSlowLogRecorder {
     for (; i < 5; i++) {
       RpcLogDetails rpcLogDetails =
         getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" 
+ (i + 1));
-      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+      namedQueueRecorder.addRecord(rpcLogDetails);
     }
 
-    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
-      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 5));
-    List<SlowLogPayload> slowLogPayloads = 
slowLogRecorder.getSlowLogPayloads(request);
+    Assert.assertNotEquals(-1,
+      HBASE_TESTING_UTILITY.waitFor(3000, () -> 
getSlowLogPayloads(request).size() == 5));
+    List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
     Assert.assertTrue(confirmPayloadParams(0, 5, slowLogPayloads));
     Assert.assertTrue(confirmPayloadParams(1, 4, slowLogPayloads));
     Assert.assertTrue(confirmPayloadParams(2, 3, slowLogPayloads));
@@ -135,15 +139,15 @@ public class TestSlowLogRecorder {
     for (; i < 7; i++) {
       RpcLogDetails rpcLogDetails =
         getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" 
+ (i + 1));
-      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+      namedQueueRecorder.addRecord(rpcLogDetails);
     }
 
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
-      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 7));
+      () -> getSlowLogPayloads(request).size() == 7));
 
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
       () -> {
-        List<SlowLogPayload> slowLogPayloadsList = 
slowLogRecorder.getSlowLogPayloads(request);
+        List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
         return slowLogPayloadsList.size() == 7
           && confirmPayloadParams(0, 7, slowLogPayloadsList)
           && confirmPayloadParams(5, 2, slowLogPayloadsList)
@@ -155,15 +159,15 @@ public class TestSlowLogRecorder {
     for (; i < 10; i++) {
       RpcLogDetails rpcLogDetails =
         getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" 
+ (i + 1));
-      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+      namedQueueRecorder.addRecord(rpcLogDetails);
     }
 
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
-      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 8));
+      () -> getSlowLogPayloads(request).size() == 8));
 
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
       () -> {
-        List<SlowLogPayload> slowLogPayloadsList = 
slowLogRecorder.getSlowLogPayloads(request);
+        List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
         // confirm ringbuffer is full
         return slowLogPayloadsList.size() == 8
           && confirmPayloadParams(7, 3, slowLogPayloadsList)
@@ -176,15 +180,15 @@ public class TestSlowLogRecorder {
     for (; i < 14; i++) {
       RpcLogDetails rpcLogDetails =
         getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" 
+ (i + 1));
-      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+      namedQueueRecorder.addRecord(rpcLogDetails);
     }
 
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
-      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 8));
+      () -> getSlowLogPayloads(request).size() == 8));
 
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
       () -> {
-        List<SlowLogPayload> slowLogPayloadsList = 
slowLogRecorder.getSlowLogPayloads(request);
+        List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
         // confirm ringbuffer is full
         // and ordered events
         return slowLogPayloadsList.size() == 8
@@ -195,9 +199,14 @@ public class TestSlowLogRecorder {
       })
     );
 
+    AdminProtos.SlowLogResponseRequest largeLogRequest =
+      AdminProtos.SlowLogResponseRequest.newBuilder()
+        .setLimit(15)
+        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
+        .build();
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
       () -> {
-        List<SlowLogPayload> slowLogPayloadsList = 
slowLogRecorder.getLargeLogPayloads(request);
+        List<SlowLogPayload> slowLogPayloadsList = 
getSlowLogPayloads(largeLogRequest);
         // confirm ringbuffer is full
         // and ordered events
         return slowLogPayloadsList.size() == 8
@@ -210,11 +219,12 @@ public class TestSlowLogRecorder {
 
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
       () -> {
-        boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads();
+        boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue(
+          NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
 
         LOG.debug("cleared the ringbuffer of Online Slow Log records");
 
-        List<SlowLogPayload> slowLogPayloadsList = 
slowLogRecorder.getSlowLogPayloads(request);
+        List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
         // confirm ringbuffer is empty
         return slowLogPayloadsList.size() == 0 && isRingBufferCleaned;
       })
@@ -222,30 +232,43 @@ public class TestSlowLogRecorder {
 
   }
 
+  private List<SlowLogPayload> 
getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
+    NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
+    
namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
+    namedQueueGetRequest.setSlowLogResponseRequest(request);
+    NamedQueueGetResponse namedQueueGetResponse =
+      namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
+    return namedQueueGetResponse == null ?
+      Collections.emptyList() : namedQueueGetResponse.getSlowLogPayloads();
+  }
+
   @Test
   public void testOnlineSlowLogWithHighRecords() throws Exception {
 
     Configuration conf = applySlowLogRecorderConf(14);
-    slowLogRecorder = new SlowLogRecorder(conf);
+    Constructor<NamedQueueRecorder> constructor =
+      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
+    constructor.setAccessible(true);
+    namedQueueRecorder = constructor.newInstance(conf);
     AdminProtos.SlowLogResponseRequest request =
       AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 
11).build();
 
-    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
     LOG.debug("Initially ringbuffer of Slow Log records is empty");
 
     for (int i = 0; i < 14 * 11; i++) {
       RpcLogDetails rpcLogDetails =
         getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" 
+ (i + 1));
-      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+      namedQueueRecorder.addRecord(rpcLogDetails);
     }
     LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 
records");
 
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
-      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
+      () -> getSlowLogPayloads(request).size() == 14));
 
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
       () -> {
-        List<SlowLogPayload> slowLogPayloads = 
slowLogRecorder.getSlowLogPayloads(request);
+        List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
 
         // confirm strict order of slow log payloads
         return slowLogPayloads.size() == 14
@@ -266,37 +289,37 @@ public class TestSlowLogRecorder {
       })
     );
 
-    boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads();
+    boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue(
+      NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
     Assert.assertTrue(isRingBufferCleaned);
     LOG.debug("cleared the ringbuffer of Online Slow Log records");
-    List<SlowLogPayload> slowLogPayloads = 
slowLogRecorder.getSlowLogPayloads(request);
+    List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
 
     // confirm ringbuffer is empty
     Assert.assertEquals(slowLogPayloads.size(), 0);
-
   }
 
   @Test
   public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception {
-
     Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
     conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY);
 
-    slowLogRecorder = new SlowLogRecorder(conf);
+    Constructor<NamedQueueRecorder> constructor =
+      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
+    constructor.setAccessible(true);
+    namedQueueRecorder = constructor.newInstance(conf);
     AdminProtos.SlowLogResponseRequest request =
       AdminProtos.SlowLogResponseRequest.newBuilder().build();
-    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
     LOG.debug("Initially ringbuffer of Slow Log records is empty");
-
     for (int i = 0; i < 300; i++) {
       RpcLogDetails rpcLogDetails =
         getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" 
+ (i + 1));
-      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+      namedQueueRecorder.addRecord(rpcLogDetails);
     }
-
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
       () -> {
-        List<SlowLogPayload> slowLogPayloads = 
slowLogRecorder.getSlowLogPayloads(request);
+        List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
         return slowLogPayloads.size() == 0;
       })
     );
@@ -305,56 +328,58 @@ public class TestSlowLogRecorder {
 
   @Test
   public void testOnlineSlowLogWithDisableConfig() throws Exception {
-
     Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
     conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false);
+    Constructor<NamedQueueRecorder> constructor =
+      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
+    constructor.setAccessible(true);
+    namedQueueRecorder = constructor.newInstance(conf);
 
-    slowLogRecorder = new SlowLogRecorder(conf);
     AdminProtos.SlowLogResponseRequest request =
       AdminProtos.SlowLogResponseRequest.newBuilder().build();
-    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
     LOG.debug("Initially ringbuffer of Slow Log records is empty");
-
     for (int i = 0; i < 300; i++) {
       RpcLogDetails rpcLogDetails =
         getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" 
+ (i + 1));
-      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+      namedQueueRecorder.addRecord(rpcLogDetails);
     }
-
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
       () -> {
-        List<SlowLogPayload> slowLogPayloads = 
slowLogRecorder.getSlowLogPayloads(request);
+        List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
         return slowLogPayloads.size() == 0;
       })
     );
     conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
-
   }
 
   @Test
   public void testSlowLogFilters() throws Exception {
 
     Configuration conf = applySlowLogRecorderConf(30);
-    slowLogRecorder = new SlowLogRecorder(conf);
+    Constructor<NamedQueueRecorder> constructor =
+      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
+    constructor.setAccessible(true);
+    namedQueueRecorder = constructor.newInstance(conf);
     AdminProtos.SlowLogResponseRequest request =
       AdminProtos.SlowLogResponseRequest.newBuilder()
         .setLimit(15)
         .setUserName("userName_87")
         .build();
 
-    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
 
     LOG.debug("Initially ringbuffer of Slow Log records is empty");
 
     for (int i = 0; i < 100; i++) {
       RpcLogDetails rpcLogDetails =
         getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" 
+ (i + 1));
-      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+      namedQueueRecorder.addRecord(rpcLogDetails);
     }
     LOG.debug("Added 100 records, ringbuffer should only 1 record with 
matching filter");
 
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
-      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 1));
+      () -> getSlowLogPayloads(request).size() == 1));
 
     AdminProtos.SlowLogResponseRequest requestClient =
       AdminProtos.SlowLogResponseRequest.newBuilder()
@@ -362,25 +387,32 @@ public class TestSlowLogRecorder {
         .setClientAddress("client_85")
         .build();
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
-      () -> slowLogRecorder.getSlowLogPayloads(requestClient).size() == 1));
+      () -> getSlowLogPayloads(requestClient).size() == 1));
 
     AdminProtos.SlowLogResponseRequest requestSlowLog =
       AdminProtos.SlowLogResponseRequest.newBuilder()
         .setLimit(15)
         .build();
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
-      () -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15));
-
+      () -> getSlowLogPayloads(requestSlowLog).size() == 15));
   }
 
   @Test
   public void testConcurrentSlowLogEvents() throws Exception {
 
     Configuration conf = applySlowLogRecorderConf(50000);
-    slowLogRecorder = new SlowLogRecorder(conf);
+    Constructor<NamedQueueRecorder> constructor =
+      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
+    constructor.setAccessible(true);
+    namedQueueRecorder = constructor.newInstance(conf);
     AdminProtos.SlowLogResponseRequest request =
       AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
-    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+    AdminProtos.SlowLogResponseRequest largeLogRequest =
+      AdminProtos.SlowLogResponseRequest.newBuilder()
+        .setLimit(500000)
+        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
+        .build();
+    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
     LOG.debug("Initially ringbuffer of Slow Log records is empty");
 
     for (int j = 0; j < 1000; j++) {
@@ -389,7 +421,7 @@ public class TestSlowLogRecorder {
         for (int i = 0; i < 3500; i++) {
           RpcLogDetails rpcLogDetails =
             getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), 
"class_" + (i + 1));
-          slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+          namedQueueRecorder.addRecord(rpcLogDetails);
         }
       });
 
@@ -397,22 +429,24 @@ public class TestSlowLogRecorder {
 
     Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
 
-    slowLogRecorder.clearSlowLogPayloads();
-
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(
-      5000, () -> slowLogRecorder.getSlowLogPayloads(request).size() > 10000));
+      5000, () -> getSlowLogPayloads(request).size() > 10000));
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(
-      5000, () -> slowLogRecorder.getLargeLogPayloads(request).size() > 
10000));
+      5000, () -> getSlowLogPayloads(largeLogRequest).size() > 10000));
   }
 
   @Test
   public void testSlowLargeLogEvents() throws Exception {
     Configuration conf = applySlowLogRecorderConf(28);
-    slowLogRecorder = new SlowLogRecorder(conf);
+    Constructor<NamedQueueRecorder> constructor =
+      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
+    constructor.setAccessible(true);
+    namedQueueRecorder = constructor.newInstance(conf);
+
     AdminProtos.SlowLogResponseRequest request =
       AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 
11).build();
 
-    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
     LOG.debug("Initially ringbuffer of Slow Log records is empty");
 
     boolean isSlowLog;
@@ -428,16 +462,16 @@ public class TestSlowLogRecorder {
       RpcLogDetails rpcLogDetails =
         getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" 
+ (i + 1),
           isSlowLog, isLargeLog);
-      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+      namedQueueRecorder.addRecord(rpcLogDetails);
     }
     LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 
records");
 
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
-      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
+      () -> getSlowLogPayloads(request).size() == 14));
 
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
       () -> {
-        List<SlowLogPayload> slowLogPayloads = 
slowLogRecorder.getSlowLogPayloads(request);
+        List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
 
         // confirm strict order of slow log payloads
         return slowLogPayloads.size() == 14
@@ -458,12 +492,18 @@ public class TestSlowLogRecorder {
       })
     );
 
+    AdminProtos.SlowLogResponseRequest largeLogRequest =
+      AdminProtos.SlowLogResponseRequest.newBuilder()
+        .setLimit(14 * 11)
+        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
+        .build();
+
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
-      () -> slowLogRecorder.getLargeLogPayloads(request).size() == 14));
+      () -> getSlowLogPayloads(largeLogRequest).size() == 14));
 
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
       () -> {
-        List<SlowLogPayload> largeLogPayloads = 
slowLogRecorder.getLargeLogPayloads(request);
+        List<SlowLogPayload> largeLogPayloads = 
getSlowLogPayloads(largeLogRequest);
 
         // confirm strict order of slow log payloads
         return largeLogPayloads.size() == 14
@@ -483,14 +523,16 @@ public class TestSlowLogRecorder {
           && confirmPayloadParams(13, 128, largeLogPayloads);
       })
     );
-
   }
 
   @Test
   public void testSlowLogMixedFilters() throws Exception {
 
     Configuration conf = applySlowLogRecorderConf(30);
-    slowLogRecorder = new SlowLogRecorder(conf);
+    Constructor<NamedQueueRecorder> constructor =
+      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
+    constructor.setAccessible(true);
+    namedQueueRecorder = constructor.newInstance(conf);
     AdminProtos.SlowLogResponseRequest request =
       AdminProtos.SlowLogResponseRequest.newBuilder()
         .setLimit(15)
@@ -498,23 +540,23 @@ public class TestSlowLogRecorder {
         .setClientAddress("client_88")
         .build();
 
-    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
 
     for (int i = 0; i < 100; i++) {
       RpcLogDetails rpcLogDetails =
         getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" 
+ (i + 1));
-      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+      namedQueueRecorder.addRecord(rpcLogDetails);
     }
 
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
-      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 2));
+      () -> getSlowLogPayloads(request).size() == 2));
 
     AdminProtos.SlowLogResponseRequest request2 = 
AdminProtos.SlowLogResponseRequest.newBuilder()
       .setLimit(15)
       .setUserName("userName_1")
       .setClientAddress("client_2")
       .build();
-    Assert.assertEquals(0, 
slowLogRecorder.getSlowLogPayloads(request2).size());
+    Assert.assertEquals(0, getSlowLogPayloads(request2).size());
 
     AdminProtos.SlowLogResponseRequest request3 =
       AdminProtos.SlowLogResponseRequest.newBuilder()
@@ -523,7 +565,7 @@ public class TestSlowLogRecorder {
         .setClientAddress("client_88")
         
.setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND)
         .build();
-    Assert.assertEquals(0, 
slowLogRecorder.getSlowLogPayloads(request3).size());
+    Assert.assertEquals(0, getSlowLogPayloads(request3).size());
 
     AdminProtos.SlowLogResponseRequest request4 =
       AdminProtos.SlowLogResponseRequest.newBuilder()
@@ -532,7 +574,7 @@ public class TestSlowLogRecorder {
         .setClientAddress("client_87")
         
.setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND)
         .build();
-    Assert.assertEquals(1, 
slowLogRecorder.getSlowLogPayloads(request4).size());
+    Assert.assertEquals(1, getSlowLogPayloads(request4).size());
 
     AdminProtos.SlowLogResponseRequest request5 =
       AdminProtos.SlowLogResponseRequest.newBuilder()
@@ -541,14 +583,14 @@ public class TestSlowLogRecorder {
         .setClientAddress("client_89")
         
.setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR)
         .build();
-    Assert.assertEquals(2, 
slowLogRecorder.getSlowLogPayloads(request5).size());
+    Assert.assertEquals(2, getSlowLogPayloads(request5).size());
 
     AdminProtos.SlowLogResponseRequest requestSlowLog =
       AdminProtos.SlowLogResponseRequest.newBuilder()
         .setLimit(15)
         .build();
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
-      () -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15));
+      () -> getSlowLogPayloads(requestSlowLog).size() == 15));
   }
 
   static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, 
String className) {
@@ -762,26 +804,20 @@ public class TestSlowLogRecorder {
   private static Optional<User> getUser(String userName) {
 
     return Optional.of(new User() {
-
-
       @Override
       public String getShortName() {
         return userName;
       }
 
-
       @Override
       public <T> T runAs(PrivilegedAction<T> action) {
         return null;
       }
 
-
       @Override
-      public <T> T runAs(PrivilegedExceptionAction<T> action) throws
-          IOException, InterruptedException {
+      public <T> T runAs(PrivilegedExceptionAction<T> action) {
         return null;
       }
-
     });
 
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java
similarity index 76%
rename from 
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java
rename to 
hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java
index e08ad29..a914796 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java
@@ -17,10 +17,11 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hbase.regionserver.slowlog;
+package org.apache.hadoop.hbase.namequeues;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
@@ -33,8 +34,11 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
 import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -59,11 +63,11 @@ public class TestSlowLogAccessor {
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestSlowLogAccessor.class);
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestSlowLogRecorder.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestNamedQueueRecorder.class);
 
   private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new 
HBaseTestingUtility();
 
-  private SlowLogRecorder slowLogRecorder;
+  private NamedQueueRecorder namedQueueRecorder;
 
   @BeforeClass
   public static void setup() throws Exception {
@@ -88,9 +92,19 @@ public class TestSlowLogAccessor {
   @Before
   public void setUp() throws Exception {
     HRegionServer hRegionServer = 
HBASE_TESTING_UTILITY.getMiniHBaseCluster().getRegionServer(0);
-    Field slowLogRecorder = 
HRegionServer.class.getDeclaredField("slowLogRecorder");
+    Field slowLogRecorder = 
HRegionServer.class.getDeclaredField("namedQueueRecorder");
     slowLogRecorder.setAccessible(true);
-    this.slowLogRecorder = (SlowLogRecorder) 
slowLogRecorder.get(hRegionServer);
+    this.namedQueueRecorder = (NamedQueueRecorder) 
slowLogRecorder.get(hRegionServer);
+  }
+
+  private List<TooSlowLog.SlowLogPayload> getSlowLogPayloads(
+      AdminProtos.SlowLogResponseRequest request) {
+    NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
+    
namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
+    namedQueueGetRequest.setSlowLogResponseRequest(request);
+    NamedQueueGetResponse namedQueueGetResponse =
+      namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
+    return namedQueueGetResponse.getSlowLogPayloads();
   }
 
   @Test
@@ -99,42 +113,42 @@ public class TestSlowLogAccessor {
     AdminProtos.SlowLogResponseRequest request =
       AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
 
-    slowLogRecorder.clearSlowLogPayloads();
-    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+    
namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
+    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
 
     int i = 0;
 
     Connection connection = waitForSlowLogTableCreation();
     // add 5 records initially
     for (; i < 5; i++) {
-      RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+      RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
         .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" 
+ (i + 1));
-      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+      namedQueueRecorder.addRecord(rpcLogDetails);
     }
 
     // add 2 more records
     for (; i < 7; i++) {
-      RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+      RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
         .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" 
+ (i + 1));
-      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+      namedQueueRecorder.addRecord(rpcLogDetails);
     }
 
     // add 3 more records
     for (; i < 10; i++) {
-      RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+      RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
         .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" 
+ (i + 1));
-      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+      namedQueueRecorder.addRecord(rpcLogDetails);
     }
 
     // add 4 more records
     for (; i < 14; i++) {
-      RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+      RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
         .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" 
+ (i + 1));
-      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+      namedQueueRecorder.addRecord(rpcLogDetails);
     }
 
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY
-      .waitFor(3000, () -> slowLogRecorder.getSlowLogPayloads(request).size() 
== 14));
+      .waitFor(3000, () -> getSlowLogPayloads(request).size() == 14));
 
     Assert.assertNotEquals(-1,
       HBASE_TESTING_UTILITY.waitFor(3000, () -> getTableCount(connection) == 
14));
@@ -170,10 +184,10 @@ public class TestSlowLogAccessor {
   public void testHigherSlowLogs() throws Exception {
     Connection connection = waitForSlowLogTableCreation();
 
-    slowLogRecorder.clearSlowLogPayloads();
+    
namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
     AdminProtos.SlowLogResponseRequest request =
       AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
-    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
 
     for (int j = 0; j < 100; j++) {
       CompletableFuture.runAsync(() -> {
@@ -181,15 +195,15 @@ public class TestSlowLogAccessor {
           if (i == 300) {
             Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
           }
-          RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+          RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
             .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), 
"class_" + (i + 1));
-          slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+          namedQueueRecorder.addRecord(rpcLogDetails);
         }
       });
     }
 
     Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(7000, () -> {
-      int count = slowLogRecorder.getSlowLogPayloads(request).size();
+      int count = getSlowLogPayloads(request).size();
       LOG.debug("RingBuffer records count: {}", count);
       return count > 2000;
     }));
diff --git a/hbase-shell/src/test/ruby/hbase/admin_test.rb 
b/hbase-shell/src/test/ruby/hbase/admin_test.rb
index 3a0cb90..11798c3 100644
--- a/hbase-shell/src/test/ruby/hbase/admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/admin_test.rb
@@ -404,7 +404,7 @@ module Hbase
 
     define_test 'clear slowlog responses should work' do
       output = capture_stdout { command(:clear_slowlog_responses, nil) }
-      assert(output.include?('Cleared Slowlog responses from 1/1 
RegionServers'))
+      assert(output.include?('Cleared Slowlog responses from 0/1 
RegionServers'))
     end
 
     
#-------------------------------------------------------------------------------
diff --git 
a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java
 
b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java
index b1c71f8..b853431 100644
--- 
a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java
+++ 
b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java
@@ -200,6 +200,7 @@ public class TestThriftHBaseServiceHandler {
   @BeforeClass
   public static void beforeClass() throws Exception {
     UTIL.getConfiguration().set("hbase.client.retries.number", "3");
+    
UTIL.getConfiguration().setBoolean("hbase.regionserver.slowlog.buffer.enabled", 
true);
     UTIL.startMiniCluster();
     TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
       new 
TableDescriptorBuilder.ModifyableTableDescriptor(TableName.valueOf(tableAname));

Reply via email to