This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push: new f35c5ea HBASE-24718 : Generic NamedQueue framework for multiple use-cases (Refactor SlowLog responses) (#2109) f35c5ea is described below commit f35c5eaaddf2a7b247e3ea81c31bfa51afce35fc Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Tue Jul 21 20:24:13 2020 +0530 HBASE-24718 : Generic NamedQueue framework for multiple use-cases (Refactor SlowLog responses) (#2109) Closes #2052 Signed-off-by: Wellington Chevreuil <wchevre...@apache.org> --- .../hbase/shaded/protobuf/RequestConverter.java | 5 + hbase-common/src/main/resources/hbase-default.xml | 12 + .../src/main/protobuf/server/region/Admin.proto | 6 + .../org/apache/hadoop/hbase/ipc/RpcServer.java | 28 +-- .../hadoop/hbase/ipc/RpcServerInterface.java | 10 +- .../DisruptorExceptionHandler.java | 2 +- .../hadoop/hbase/namequeues/LogEventHandler.java | 130 ++++++++++ .../slowlog => namequeues}/LogHandlerUtils.java | 6 +- .../NamedQueuePayload.java} | 33 ++- .../NamedQueueRecorder.java} | 120 ++++------ .../hadoop/hbase/namequeues/NamedQueueService.java | 69 ++++++ .../slowlog => namequeues}/RingBufferEnvelope.java | 22 +- .../slowlog => namequeues}/RpcLogDetails.java | 5 +- .../hbase/namequeues/SlowLogPersistentService.java | 98 ++++++++ .../SlowLogTableOpsChore.java | 12 +- .../hbase/namequeues/impl/SlowLogQueueService.java | 264 ++++++++++++++++++++ .../namequeues/request/NamedQueueGetRequest.java | 65 +++++ .../namequeues/response/NamedQueueGetResponse.java | 61 +++++ .../hadoop/hbase/regionserver/HRegionServer.java | 23 +- .../hadoop/hbase/regionserver/RSRpcServices.java | 52 ++-- .../regionserver/slowlog/LogEventHandler.java | 266 --------------------- .../TestNamedQueueRecorder.java} | 216 ++++++++++------- .../TestSlowLogAccessor.java | 56 +++-- hbase-shell/src/test/ruby/hbase/admin_test.rb | 2 +- .../thrift2/TestThriftHBaseServiceHandler.java | 1 + 25 files changed, 1027 insertions(+), 537 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 2be7ccd..7b0282a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -1817,6 +1817,11 @@ public final class RequestConverter { } else { builder.setFilterByOperator(SlowLogResponseRequest.FilterByOperator.OR); } + if (LogQueryFilter.Type.SLOW_LOG.equals(logQueryFilter.getType())) { + builder.setLogType(SlowLogResponseRequest.LogType.SLOW_LOG); + } else { + builder.setLogType(SlowLogResponseRequest.LogType.LARGE_LOG); + } return builder.setLimit(logQueryFilter.getLimit()).build(); } diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index f50ed1c..25bfc87 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -1992,4 +1992,16 @@ possible configurations would overwhelm and obscure the important. too large batch request. </description> </property> + <property> + <name>hbase.namedqueue.provider.classes</name> + <value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService</value> + <description> + Default values for NamedQueueService implementors. This comma separated full class names + represent all implementors of NamedQueueService that we would like to be invoked by + LogEvent handler service. One example of NamedQueue service is SlowLogQueueService which + is used to store slow/large RPC logs in ringbuffer at each RegionServer. + All implementors of NamedQueueService should be found under package: + "org.apache.hadoop.hbase.namequeues.impl" + </description> + </property> </configuration> diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto index b8cfcde..101ed1e 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto @@ -288,12 +288,18 @@ message SlowLogResponseRequest { OR = 1; } + enum LogType { + SLOW_LOG = 0; + LARGE_LOG = 1; + } + optional string region_name = 1; optional string table_name = 2; optional string client_address = 3; optional string user_name = 4; optional uint32 limit = 5 [default = 10]; optional FilterByOperator filter_by_operator = 6 [default = OR]; + optional LogType log_type = 7; } message SlowLogResponses { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 12fd584..cace5f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.RequestTooBigException; @@ -46,8 +47,8 @@ import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.regionserver.RSRpcServices; -import org.apache.hadoop.hbase.regionserver.slowlog.RpcLogDetails; -import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder; +import org.apache.hadoop.hbase.namequeues.RpcLogDetails; +import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; @@ -94,10 +95,9 @@ public abstract class RpcServer implements RpcServerInterface, private static final String MULTI_GETS = "multi.gets"; private static final String MULTI_MUTATIONS = "multi.mutations"; private static final String MULTI_SERVICE_CALLS = "multi.service_calls"; - private static final String GET_SLOW_LOG_RESPONSES = "GetSlowLogResponses"; - private static final String CLEAR_SLOW_LOGS_RESPONSES = "ClearSlowLogsResponses"; private final boolean authorize; + private final boolean isOnlineLogProviderEnabled; protected boolean isSecurityEnabled; public static final byte CURRENT_VERSION = 0; @@ -229,7 +229,7 @@ public abstract class RpcServer implements RpcServerInterface, /** * Use to add online slowlog responses */ - private SlowLogRecorder slowLogRecorder; + private NamedQueueRecorder namedQueueRecorder; @FunctionalInterface protected interface CallCleanup { @@ -304,6 +304,8 @@ public abstract class RpcServer implements RpcServerInterface, saslProps = Collections.emptyMap(); } + this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, + HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); this.scheduler = scheduler; } @@ -432,11 +434,11 @@ public abstract class RpcServer implements RpcServerInterface, tooLarge, tooSlow, status.getClient(), startTime, processingTime, qTime, responseSize, userName); - if (this.slowLogRecorder != null) { + if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) { // send logs to ring buffer owned by slowLogRecorder - final String className = server == null ? StringUtils.EMPTY : - server.getClass().getSimpleName(); - this.slowLogRecorder.addSlowLogPayload( + final String className = + server == null ? StringUtils.EMPTY : server.getClass().getSimpleName(); + this.namedQueueRecorder.addRecord( new RpcLogDetails(call, param, status.getClient(), responseSize, className, tooSlow, tooLarge)); } @@ -819,12 +821,8 @@ public abstract class RpcServer implements RpcServerInterface, } @Override - public void setSlowLogRecorder(SlowLogRecorder slowLogRecorder) { - this.slowLogRecorder = slowLogRecorder; + public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) { + this.namedQueueRecorder = namedQueueRecorder; } - @Override - public SlowLogRecorder getSlowLogRecorder() { - return slowLogRecorder; - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java index c8a71f3..99e0188 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.regionserver.RSRpcServices; -import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder; +import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.yetus.audience.InterfaceAudience; @@ -102,12 +102,8 @@ public interface RpcServerInterface { /** * Set Online SlowLog Provider * - * @param slowLogRecorder instance of {@link SlowLogRecorder} + * @param namedQueueRecorder instance of {@link NamedQueueRecorder} */ - void setSlowLogRecorder(final SlowLogRecorder slowLogRecorder); + void setNamedQueueRecorder(final NamedQueueRecorder namedQueueRecorder); - /** - * @return Retrieve instance of {@link SlowLogRecorder} maintained by RpcServer - */ - SlowLogRecorder getSlowLogRecorder(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/DisruptorExceptionHandler.java similarity index 96% copy from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java copy to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/DisruptorExceptionHandler.java index 53a2ef1..fcaecc6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/DisruptorExceptionHandler.java @@ -17,7 +17,7 @@ * limitations under the License. */ -package org.apache.hadoop.hbase.regionserver.slowlog; +package org.apache.hadoop.hbase.namequeues; import com.lmax.disruptor.ExceptionHandler; import org.apache.yetus.audience.InterfaceAudience; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java new file mode 100644 index 0000000..8b8db2f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.namequeues; + +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.RingBuffer; + +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; +import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Event Handler run by disruptor ringbuffer consumer. + * Although this is generic implementation for namedQueue, it can have individual queue specific + * logic. + */ +@InterfaceAudience.Private +class LogEventHandler implements EventHandler<RingBufferEnvelope> { + + private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class); + + // Map that binds namedQueues to corresponding queue service implementation. + // If NamedQueue of specific type is enabled, corresponding service will be used to + // insert and retrieve records. + // Individual queue sizes should be determined based on their individual configs within + // each service. + private final Map<NamedQueuePayload.NamedQueueEvent, NamedQueueService> namedQueueServices = + new HashMap<>(); + + private static final String NAMED_QUEUE_PROVIDER_CLASSES = "hbase.namedqueue.provider.classes"; + + LogEventHandler(final Configuration conf) { + for (String implName : conf.getStringCollection(NAMED_QUEUE_PROVIDER_CLASSES)) { + Class<?> clz; + try { + clz = Class.forName(implName); + } catch (ClassNotFoundException e) { + LOG.warn("Failed to find NamedQueueService implementor class {}", implName, e); + continue; + } + + if (!NamedQueueService.class.isAssignableFrom(clz)) { + LOG.warn("Class {} is not implementor of NamedQueueService.", clz); + continue; + } + + // add all service mappings here + try { + NamedQueueService namedQueueService = + (NamedQueueService) clz.getConstructor(Configuration.class).newInstance(conf); + namedQueueServices.put(namedQueueService.getEvent(), namedQueueService); + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException + | InvocationTargetException e) { + LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.", + clz); + } + } + } + + /** + * Called when a publisher has published an event to the {@link RingBuffer}. + * This is generic consumer of disruptor ringbuffer and for each new namedQueue that we + * add, we should also provide specific consumer logic here. + * + * @param event published to the {@link RingBuffer} + * @param sequence of the event being processed + * @param endOfBatch flag to indicate if this is the last event in a batch from + * the {@link RingBuffer} + */ + @Override + public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch) { + final NamedQueuePayload namedQueuePayload = event.getPayload(); + // consume ringbuffer payload based on event type + namedQueueServices.get(namedQueuePayload.getNamedQueueEvent()) + .consumeEventFromDisruptor(namedQueuePayload); + } + + /** + * Cleans up queues maintained by services. + * + * @param namedQueueEvent type of queue to clear + * @return true if queue is cleaned up, false otherwise + */ + boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { + return namedQueueServices.get(namedQueueEvent).clearNamedQueue(); + } + + /** + * Add all in memory queue records to system table. The implementors can use system table + * or direct HDFS file or ZK as persistence system. + */ + void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { + namedQueueServices.get(namedQueueEvent).persistAll(); + } + + /** + * Retrieve in memory queue records from ringbuffer + * + * @param request namedQueue request with event type + * @return queue records from ringbuffer after filter (if applied) + */ + NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { + return namedQueueServices.get(request.getNamedQueueEvent()).getNamedQueueRecords(request); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogHandlerUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogHandlerUtils.java similarity index 96% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogHandlerUtils.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogHandlerUtils.java index f4d850f..f04cb18 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogHandlerUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogHandlerUtils.java @@ -17,7 +17,7 @@ * limitations under the License. */ -package org.apache.hadoop.hbase.regionserver.slowlog; +package org.apache.hadoop.hbase.namequeues; import java.util.ArrayList; import java.util.List; @@ -30,7 +30,7 @@ import org.apache.yetus.audience.InterfaceAudience; * Event Handler utility class */ @InterfaceAudience.Private -class LogHandlerUtils { +public class LogHandlerUtils { private static int getTotalFiltersCount(AdminProtos.SlowLogResponseRequest request) { int totalFilters = 0; @@ -91,7 +91,7 @@ class LogHandlerUtils { return filteredSlowLogPayloads; } - static List<TooSlowLog.SlowLogPayload> getFilteredLogs( + public static List<TooSlowLog.SlowLogPayload> getFilteredLogs( AdminProtos.SlowLogResponseRequest request, List<TooSlowLog.SlowLogPayload> logPayloadList) { int totalFilters = getTotalFiltersCount(request); if (totalFilters > 0) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java similarity index 54% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java index 53a2ef1..7aa87fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java @@ -17,34 +17,33 @@ * limitations under the License. */ -package org.apache.hadoop.hbase.regionserver.slowlog; +package org.apache.hadoop.hbase.namequeues; -import com.lmax.disruptor.ExceptionHandler; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * Exception Handler for Online Slow Log Ring Buffer + * Base payload to be prepared by client to send various namedQueue events for in-memory + * ring buffer storage in either HMaster or RegionServer. + * e.g slowLog responses */ @InterfaceAudience.Private -class DisruptorExceptionHandler implements ExceptionHandler<RingBufferEnvelope> { +public class NamedQueuePayload { - private static final Logger LOG = LoggerFactory.getLogger(DisruptorExceptionHandler.class); - - @Override - public void handleEventException(Throwable e, long sequence, RingBufferEnvelope event) { - LOG.error("Sequence={}, event={}", sequence, event, e); + public enum NamedQueueEvent { + SLOW_LOG } - @Override - public void handleOnStartException(Throwable e) { - LOG.error("Disruptor onStartException: ", e); + private final NamedQueueEvent namedQueueEvent; + + public NamedQueuePayload(NamedQueueEvent namedQueueEvent) { + if (namedQueueEvent == null) { + throw new RuntimeException("NamedQueuePayload with null namedQueueEvent"); + } + this.namedQueueEvent = namedQueueEvent; } - @Override - public void handleOnShutdownException(Throwable e) { - LOG.error("Disruptor onShutdownException: ", e); + public NamedQueueEvent getNamedQueueEvent() { + return namedQueueEvent; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java similarity index 50% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java index b0fb3e7..cb3512a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java @@ -17,87 +17,80 @@ * limitations under the License. */ -package org.apache.hadoop.hbase.regionserver.slowlog; +package org.apache.hadoop.hbase.namequeues; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; -import java.util.Collections; -import java.util.List; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; +import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.hadoop.hbase.util.Threads; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; - /** - * Online Slow/Large Log Provider Service that keeps slow/large RPC logs in the ring buffer. - * The service uses LMAX Disruptor to save slow records which are then consumed by + * NamedQueue recorder that maintains various named queues. + * The service uses LMAX Disruptor to save queue records which are then consumed by * a queue and based on the ring buffer size, the available records are then fetched * from the queue in thread-safe manner. */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class SlowLogRecorder { +public class NamedQueueRecorder { private final Disruptor<RingBufferEnvelope> disruptor; private final LogEventHandler logEventHandler; - private final int eventCount; - private final boolean isOnlineLogProviderEnabled; - private static final String SLOW_LOG_RING_BUFFER_SIZE = - "hbase.regionserver.slowlog.ringbuffer.size"; + private static NamedQueueRecorder namedQueueRecorder; + private static boolean isInit = false; + private static final Object LOCK = new Object(); /** * Initialize disruptor with configurable ringbuffer size */ - public SlowLogRecorder(Configuration conf) { - isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, - HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); - - if (!isOnlineLogProviderEnabled) { - this.disruptor = null; - this.logEventHandler = null; - this.eventCount = 0; - return; - } - - this.eventCount = conf.getInt(SLOW_LOG_RING_BUFFER_SIZE, - HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE); + private NamedQueueRecorder(Configuration conf) { // This is the 'writer' -- a single threaded executor. This single thread consumes what is // put on the ringbuffer. final String hostingThreadName = Thread.currentThread().getName(); + int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024); + // disruptor initialization with BlockingWaitStrategy this.disruptor = new Disruptor<>(RingBufferEnvelope::new, - getEventCount(), + getEventCount(eventCount), Threads.newDaemonThreadFactory(hostingThreadName + ".slowlog.append"), ProducerType.MULTI, new BlockingWaitStrategy()); this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler()); // initialize ringbuffer event handler - final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, - HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); - this.logEventHandler = new LogEventHandler(this.eventCount, isSlowLogTableEnabled, conf); + this.logEventHandler = new LogEventHandler(conf); this.disruptor.handleEventsWith(new LogEventHandler[]{this.logEventHandler}); this.disruptor.start(); } + public static NamedQueueRecorder getInstance(Configuration conf) { + if (namedQueueRecorder != null) { + return namedQueueRecorder; + } + synchronized (LOCK) { + if (!isInit) { + namedQueueRecorder = new NamedQueueRecorder(conf); + isInit = true; + } + } + return namedQueueRecorder; + } + // must be power of 2 for disruptor ringbuffer - private int getEventCount() { - Preconditions.checkArgument(eventCount >= 0, - SLOW_LOG_RING_BUFFER_SIZE + " must be > 0"); + private int getEventCount(int eventCount) { + Preconditions.checkArgument(eventCount >= 0, "hbase.namedqueue.ringbuffer.size must be > 0"); int floor = Integer.highestOneBit(eventCount); if (floor == eventCount) { return floor; @@ -110,66 +103,53 @@ public class SlowLogRecorder { } /** - * Retrieve online slow logs from ringbuffer - * - * @param request slow log request parameters - * @return online slow logs from ringbuffer - */ - public List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) { - return isOnlineLogProviderEnabled ? this.logEventHandler.getSlowLogPayloads(request) - : Collections.emptyList(); - } - - /** - * Retrieve online large logs from ringbuffer + * Retrieve in memory queue records from ringbuffer * - * @param request large log request parameters - * @return online large logs from ringbuffer + * @param request namedQueue request with event type + * @return queue records from ringbuffer after filter (if applied) */ - public List<SlowLogPayload> getLargeLogPayloads(AdminProtos.SlowLogResponseRequest request) { - return isOnlineLogProviderEnabled ? this.logEventHandler.getLargeLogPayloads(request) - : Collections.emptyList(); + public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { + return this.logEventHandler.getNamedQueueRecords(request); } /** - * clears slow log payloads from ringbuffer + * clears queue records from ringbuffer * + * @param namedQueueEvent type of queue to clear * @return true if slow log payloads are cleaned up or * hbase.regionserver.slowlog.buffer.enabled is not set to true, false if failed to * clean up slow logs */ - public boolean clearSlowLogPayloads() { - if (!isOnlineLogProviderEnabled) { - return true; - } - return this.logEventHandler.clearSlowLogs(); + public boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { + return this.logEventHandler.clearNamedQueue(namedQueueEvent); } /** - * Add slow log rpcCall details to ringbuffer + * Add various NamedQueue records to ringbuffer. Based on the type of the event (e.g slowLog), + * consumer of disruptor ringbuffer will have specific logic. + * This method is producer of disruptor ringbuffer which is initialized in NamedQueueRecorder + * constructor. * - * @param rpcLogDetails all details of rpc call that would be useful for ring buffer - * consumers + * @param namedQueuePayload namedQueue payload sent by client of ring buffer + * service */ - public void addSlowLogPayload(RpcLogDetails rpcLogDetails) { - if (!isOnlineLogProviderEnabled) { - return; - } + public void addRecord(NamedQueuePayload namedQueuePayload) { RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer(); long seqId = ringBuffer.next(); try { - ringBuffer.get(seqId).load(rpcLogDetails); + ringBuffer.get(seqId).load(namedQueuePayload); } finally { ringBuffer.publish(seqId); } } /** - * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch + * Add all in memory queue records to system table. The implementors can use system table + * or direct HDFS file or ZK as persistence system. */ - public void addAllLogsToSysTable() { + public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { if (this.logEventHandler != null) { - this.logEventHandler.addAllLogsToSysTable(); + this.logEventHandler.persistAll(namedQueueEvent); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java new file mode 100644 index 0000000..84c1b24 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.namequeues; + +import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; +import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * In-memory Queue service provider for multiple use-cases. Implementers should be + * registered in LogEventHandler + */ +@InterfaceAudience.Private +public interface NamedQueueService { + + /** + * Retrieve event type for NamedQueueService implementation. + * + * @return {@link NamedQueuePayload.NamedQueueEvent} + */ + NamedQueuePayload.NamedQueueEvent getEvent(); + + /** + * This implementation is generic for consuming records from LMAX + * disruptor and inserts records to EvictingQueue which is maintained by each + * ringbuffer provider. + * + * @param namedQueuePayload namedQueue payload from disruptor ring buffer + */ + void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload); + + /** + * Cleans up queues maintained by services. + * + * @return true if slow log payloads are cleaned up, false otherwise + */ + boolean clearNamedQueue(); + + /** + * Retrieve in memory queue records from ringbuffer + * + * @param request namedQueue request with event type + * @return queue records from ringbuffer after filter (if applied) + */ + NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request); + + /** + * Add all in memory queue records to system table. The implementors can use system table + * or direct HDFS file or ZK as persistence system. + */ + void persistAll(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java similarity index 68% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java index d308670..f93baaa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java @@ -17,29 +17,29 @@ * limitations under the License. */ -package org.apache.hadoop.hbase.regionserver.slowlog; +package org.apache.hadoop.hbase.namequeues; import org.apache.hadoop.hbase.ipc.RpcCall; import org.apache.yetus.audience.InterfaceAudience; /** - * An envelope to carry payload in the slow log ring buffer that serves as online buffer - * to provide latest TooSlowLog + * An envelope to carry payload in the ring buffer that serves as online buffer + * to provide latest events */ @InterfaceAudience.Private final class RingBufferEnvelope { - private RpcLogDetails rpcLogDetails; + private NamedQueuePayload namedQueuePayload; /** * Load the Envelope with {@link RpcCall} * - * @param rpcLogDetails all details of rpc call that would be useful for ring buffer + * @param namedQueuePayload all details of rpc call that would be useful for ring buffer * consumers */ - public void load(RpcLogDetails rpcLogDetails) { - this.rpcLogDetails = rpcLogDetails; + public void load(NamedQueuePayload namedQueuePayload) { + this.namedQueuePayload = namedQueuePayload; } /** @@ -48,10 +48,10 @@ final class RingBufferEnvelope { * * @return Retrieve rpc log details */ - public RpcLogDetails getPayload() { - final RpcLogDetails rpcLogDetails = this.rpcLogDetails; - this.rpcLogDetails = null; - return rpcLogDetails; + public NamedQueuePayload getPayload() { + final NamedQueuePayload namedQueuePayload = this.namedQueuePayload; + this.namedQueuePayload = null; + return namedQueuePayload; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java similarity index 94% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java index b469cdb..581d1a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java @@ -17,7 +17,7 @@ * limitations under the License. */ -package org.apache.hadoop.hbase.regionserver.slowlog; +package org.apache.hadoop.hbase.namequeues; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.hadoop.hbase.ipc.RpcCall; @@ -28,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience; * RpcCall details that would be passed on to ring buffer of slow log responses */ @InterfaceAudience.Private -public class RpcLogDetails { +public class RpcLogDetails extends NamedQueuePayload { private final RpcCall rpcCall; private final Message param; @@ -40,6 +40,7 @@ public class RpcLogDetails { public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long responseSize, String className, boolean isSlowLog, boolean isLargeLog) { + super(NamedQueueEvent.SLOW_LOG); this.rpcCall = rpcCall; this.param = param; this.clientAddress = clientAddress; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java new file mode 100644 index 0000000..2c701ff --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.namequeues; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; +import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor; +import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; +import org.apache.hbase.thirdparty.com.google.common.collect.Queues; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Persistent service provider for Slow/LargeLog events + */ +@InterfaceAudience.Private +public class SlowLogPersistentService { + + private static final Logger LOG = LoggerFactory.getLogger(SlowLogPersistentService.class); + + private static final ReentrantLock LOCK = new ReentrantLock(); + private static final String SYS_TABLE_QUEUE_SIZE = + "hbase.regionserver.slowlog.systable.queue.size"; + private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000; + private static final int SYSTABLE_PUT_BATCH_SIZE = 100; + + private final Queue<TooSlowLog.SlowLogPayload> queueForSysTable; + + private final Configuration configuration; + + public SlowLogPersistentService(final Configuration configuration) { + this.configuration = configuration; + int sysTableQueueSize = + configuration.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE); + EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueueForTable = + EvictingQueue.create(sysTableQueueSize); + queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable); + } + + public void addToQueueForSysTable(TooSlowLog.SlowLogPayload slowLogPayload) { + queueForSysTable.add(slowLogPayload); + } + + /** + * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch + */ + public void addAllLogsToSysTable() { + if (queueForSysTable == null) { + LOG.trace("hbase.regionserver.slowlog.systable.enabled is turned off. Exiting."); + return; + } + if (LOCK.isLocked()) { + return; + } + LOCK.lock(); + try { + List<TooSlowLog.SlowLogPayload> slowLogPayloads = new ArrayList<>(); + int i = 0; + while (!queueForSysTable.isEmpty()) { + slowLogPayloads.add(queueForSysTable.poll()); + i++; + if (i == SYSTABLE_PUT_BATCH_SIZE) { + SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); + slowLogPayloads.clear(); + i = 0; + } + } + if (slowLogPayloads.size() > 0) { + SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); + } + } finally { + LOCK.unlock(); + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java similarity index 84% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java index 77749f7..bc892e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java @@ -17,7 +17,7 @@ * limitations under the License. */ -package org.apache.hadoop.hbase.regionserver.slowlog; +package org.apache.hadoop.hbase.namequeues; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; @@ -33,7 +33,7 @@ public class SlowLogTableOpsChore extends ScheduledChore { private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableOpsChore.class); - private final SlowLogRecorder slowLogRecorder; + private final NamedQueueRecorder namedQueueRecorder; /** * Chore Constructor @@ -41,12 +41,12 @@ public class SlowLogTableOpsChore extends ScheduledChore { * @param stopper The stopper - When {@link Stoppable#isStopped()} is true, this chore will * cancel and cleanup * @param period Period in millis with which this Chore repeats execution when scheduled - * @param slowLogRecorder {@link SlowLogRecorder} instance + * @param namedQueueRecorder {@link NamedQueueRecorder} instance */ public SlowLogTableOpsChore(final Stoppable stopper, final int period, - final SlowLogRecorder slowLogRecorder) { + final NamedQueueRecorder namedQueueRecorder) { super("SlowLogTableOpsChore", stopper, period); - this.slowLogRecorder = slowLogRecorder; + this.namedQueueRecorder = namedQueueRecorder; } @Override @@ -54,7 +54,7 @@ public class SlowLogTableOpsChore extends ScheduledChore { if (LOG.isTraceEnabled()) { LOG.trace("SlowLog Table Ops Chore is starting up."); } - slowLogRecorder.addAllLogsToSysTable(); + namedQueueRecorder.persistAll(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); if (LOG.isTraceEnabled()) { LOG.trace("SlowLog Table Ops Chore is closing."); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java new file mode 100644 index 0000000..f26ff51 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java @@ -0,0 +1,264 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.namequeues.impl; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.SlowLogParams; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.namequeues.LogHandlerUtils; +import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; +import org.apache.hadoop.hbase.namequeues.NamedQueueService; +import org.apache.hadoop.hbase.namequeues.RpcLogDetails; +import org.apache.hadoop.hbase.namequeues.SlowLogPersistentService; +import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; +import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; +import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; +import org.apache.hbase.thirdparty.com.google.common.collect.Queues; +import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.stream.Collectors; + +/** + * In-memory Queue service provider for Slow/LargeLog events + */ +@InterfaceAudience.Private +public class SlowLogQueueService implements NamedQueueService { + + private static final Logger LOG = LoggerFactory.getLogger(SlowLogQueueService.class); + + private static final String SLOW_LOG_RING_BUFFER_SIZE = + "hbase.regionserver.slowlog.ringbuffer.size"; + + private final boolean isOnlineLogProviderEnabled; + private final boolean isSlowLogTableEnabled; + private final SlowLogPersistentService slowLogPersistentService; + private final Queue<TooSlowLog.SlowLogPayload> slowLogQueue; + + public SlowLogQueueService(Configuration conf) { + this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, + HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); + + if (!isOnlineLogProviderEnabled) { + this.isSlowLogTableEnabled = false; + this.slowLogPersistentService = null; + this.slowLogQueue = null; + return; + } + + // Initialize SlowLog Queue + int slowLogQueueSize = + conf.getInt(SLOW_LOG_RING_BUFFER_SIZE, HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE); + + EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueue = + EvictingQueue.create(slowLogQueueSize); + slowLogQueue = Queues.synchronizedQueue(evictingQueue); + + this.isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, + HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); + if (isSlowLogTableEnabled) { + slowLogPersistentService = new SlowLogPersistentService(conf); + } else { + slowLogPersistentService = null; + } + } + + @Override + public NamedQueuePayload.NamedQueueEvent getEvent() { + return NamedQueuePayload.NamedQueueEvent.SLOW_LOG; + } + + /** + * This implementation is specific to slowLog event. This consumes slowLog event from + * disruptor and inserts records to EvictingQueue. + * + * @param namedQueuePayload namedQueue payload from disruptor ring buffer + */ + @Override + public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { + if (!isOnlineLogProviderEnabled) { + return; + } + if (!(namedQueuePayload instanceof RpcLogDetails)) { + LOG.warn("SlowLogQueueService: NamedQueuePayload is not of type RpcLogDetails."); + return; + } + final RpcLogDetails rpcLogDetails = (RpcLogDetails) namedQueuePayload; + final RpcCall rpcCall = rpcLogDetails.getRpcCall(); + final String clientAddress = rpcLogDetails.getClientAddress(); + final long responseSize = rpcLogDetails.getResponseSize(); + final String className = rpcLogDetails.getClassName(); + final TooSlowLog.SlowLogPayload.Type type = getLogType(rpcLogDetails); + if (type == null) { + return; + } + Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod(); + Message param = rpcLogDetails.getParam(); + long receiveTime = rpcCall.getReceiveTime(); + long startTime = rpcCall.getStartTime(); + long endTime = System.currentTimeMillis(); + int processingTime = (int) (endTime - startTime); + int qTime = (int) (startTime - receiveTime); + final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param); + int numGets = 0; + int numMutations = 0; + int numServiceCalls = 0; + if (param instanceof ClientProtos.MultiRequest) { + ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param; + for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) { + for (ClientProtos.Action action : regionAction.getActionList()) { + if (action.hasMutation()) { + numMutations++; + } + if (action.hasGet()) { + numGets++; + } + if (action.hasServiceCall()) { + numServiceCalls++; + } + } + } + } + final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY); + final String methodDescriptorName = + methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY; + TooSlowLog.SlowLogPayload slowLogPayload = TooSlowLog.SlowLogPayload.newBuilder() + .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")") + .setClientAddress(clientAddress) + .setMethodName(methodDescriptorName) + .setMultiGets(numGets) + .setMultiMutations(numMutations) + .setMultiServiceCalls(numServiceCalls) + .setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY) + .setProcessingTime(processingTime) + .setQueueTime(qTime) + .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY) + .setResponseSize(responseSize) + .setServerClass(className) + .setStartTime(startTime) + .setType(type) + .setUserName(userName) + .build(); + slowLogQueue.add(slowLogPayload); + if (isSlowLogTableEnabled) { + if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) { + slowLogPersistentService.addToQueueForSysTable(slowLogPayload); + } + } + } + + @Override + public boolean clearNamedQueue() { + if (!isOnlineLogProviderEnabled) { + return false; + } + LOG.debug("Received request to clean up online slowlog buffer."); + slowLogQueue.clear(); + return true; + } + + @Override + public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { + if (!isOnlineLogProviderEnabled) { + return null; + } + final AdminProtos.SlowLogResponseRequest slowLogResponseRequest = + request.getSlowLogResponseRequest(); + final List<TooSlowLog.SlowLogPayload> slowLogPayloads; + if (AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG + .equals(slowLogResponseRequest.getLogType())) { + slowLogPayloads = getLargeLogPayloads(slowLogResponseRequest); + } else { + slowLogPayloads = getSlowLogPayloads(slowLogResponseRequest); + } + NamedQueueGetResponse response = new NamedQueueGetResponse(); + response.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); + response.setSlowLogPayloads(slowLogPayloads); + return response; + } + + private TooSlowLog.SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) { + final boolean isSlowLog = rpcCallDetails.isSlowLog(); + final boolean isLargeLog = rpcCallDetails.isLargeLog(); + final TooSlowLog.SlowLogPayload.Type type; + if (!isSlowLog && !isLargeLog) { + LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}", + rpcCallDetails); + return null; + } + if (isSlowLog && isLargeLog) { + type = TooSlowLog.SlowLogPayload.Type.ALL; + } else if (isSlowLog) { + type = TooSlowLog.SlowLogPayload.Type.SLOW_LOG; + } else { + type = TooSlowLog.SlowLogPayload.Type.LARGE_LOG; + } + return type; + } + + /** + * Add all slowLog events to system table. This is only for slowLog event's persistence on + * system table. + */ + @Override + public void persistAll() { + if (!isOnlineLogProviderEnabled) { + return; + } + if (slowLogPersistentService != null) { + slowLogPersistentService.addAllLogsToSysTable(); + } + } + + private List<TooSlowLog.SlowLogPayload> getSlowLogPayloads( + final AdminProtos.SlowLogResponseRequest request) { + List<TooSlowLog.SlowLogPayload> slowLogPayloadList = + Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])).filter( + e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL + || e.getType() == TooSlowLog.SlowLogPayload.Type.SLOW_LOG).collect(Collectors.toList()); + // latest slow logs first, operator is interested in latest records from in-memory buffer + Collections.reverse(slowLogPayloadList); + return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList); + } + + private List<TooSlowLog.SlowLogPayload> getLargeLogPayloads( + final AdminProtos.SlowLogResponseRequest request) { + List<TooSlowLog.SlowLogPayload> slowLogPayloadList = + Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])).filter( + e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL + || e.getType() == TooSlowLog.SlowLogPayload.Type.LARGE_LOG).collect(Collectors.toList()); + // latest large logs first, operator is interested in latest records from in-memory buffer + Collections.reverse(slowLogPayloadList); + return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java new file mode 100644 index 0000000..6e88bf4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.namequeues.request; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Request object to be used by ring buffer use-cases. Clients get records by sending + * this request object. + * For each ring buffer use-case, add request payload to this class, client should set + * namedQueueEvent based on use-case. + * Protobuf does not support inheritance, hence we need to work with + */ +@InterfaceAudience.Private +public class NamedQueueGetRequest { + + private AdminProtos.SlowLogResponseRequest slowLogResponseRequest; + private NamedQueuePayload.NamedQueueEvent namedQueueEvent; + + public AdminProtos.SlowLogResponseRequest getSlowLogResponseRequest() { + return slowLogResponseRequest; + } + + public void setSlowLogResponseRequest( + AdminProtos.SlowLogResponseRequest slowLogResponseRequest) { + this.slowLogResponseRequest = slowLogResponseRequest; + } + + public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() { + return namedQueueEvent; + } + + public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { + this.namedQueueEvent = namedQueueEvent; + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("slowLogResponseRequest", slowLogResponseRequest) + .append("namedQueueEvent", namedQueueEvent) + .toString(); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java new file mode 100644 index 0000000..ee4ed43 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.namequeues.response; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; +import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; +import org.apache.yetus.audience.InterfaceAudience; +import java.util.List; + +/** + * Response object to be sent by namedQueue service back to caller + */ +@InterfaceAudience.Private +public class NamedQueueGetResponse { + + private List<TooSlowLog.SlowLogPayload> slowLogPayloads; + private NamedQueuePayload.NamedQueueEvent namedQueueEvent; + + public List<TooSlowLog.SlowLogPayload> getSlowLogPayloads() { + return slowLogPayloads; + } + + public void setSlowLogPayloads(List<TooSlowLog.SlowLogPayload> slowLogPayloads) { + this.slowLogPayloads = slowLogPayloads; + } + + public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() { + return namedQueueEvent; + } + + public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { + this.namedQueueEvent = namedQueueEvent; + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("slowLogPayloads", slowLogPayloads) + .append("namedQueueEvent", namedQueueEvent) + .toString(); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 99fddf6..2df4d83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -138,8 +138,8 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; -import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder; -import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogTableOpsChore; +import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; +import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore; import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; @@ -536,7 +536,7 @@ public class HRegionServer extends Thread implements /** * Provide online slow log responses from ringbuffer */ - private SlowLogRecorder slowLogRecorder; + private NamedQueueRecorder namedQueueRecorder = null; /** * True if this RegionServer is coming up in a cluster where there is no Master; @@ -595,7 +595,12 @@ public class HRegionServer extends Thread implements this.stopped = false; if (!(this instanceof HMaster)) { - this.slowLogRecorder = new SlowLogRecorder(this.conf); + final boolean isOnlineLogProviderEnabled = conf.getBoolean( + HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, + HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); + if (isOnlineLogProviderEnabled) { + this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf); + } } rpcServices = createRpcServices(); useThisHostnameInstead = getUseThisHostnameInstead(conf); @@ -1494,12 +1499,12 @@ public class HRegionServer extends Thread implements } /** - * get Online SlowLog Provider to add slow logs to ringbuffer + * get NamedQueue Provider to add different logs to ringbuffer * - * @return Online SlowLog Provider + * @return NamedQueueRecorder */ - public SlowLogRecorder getSlowLogRecorder() { - return this.slowLogRecorder; + public NamedQueueRecorder getNamedQueueRecorder() { + return this.namedQueueRecorder; } /* @@ -2091,7 +2096,7 @@ public class HRegionServer extends Thread implements if (isSlowLogTableEnabled) { // default chore duration: 10 min final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000); - slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.slowLogRecorder); + slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder); } // Create the thread to clean the moved regions list diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 56de741..d49bf4c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -109,6 +109,9 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterRpcServices; +import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; +import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; +import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; @@ -129,7 +132,7 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; -import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder; +import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker; import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker; @@ -1305,7 +1308,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name); rpcServer.setRsRpcServices(this); if (!(rs instanceof HMaster)) { - rpcServer.setSlowLogRecorder(rs.getSlowLogRecorder()); + rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder()); } scannerLeaseTimeoutPeriod = conf.getInt( HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, @@ -4071,28 +4074,39 @@ public class RSRpcServices implements HBaseRPCErrorHandler, @QosPriority(priority = HConstants.ADMIN_QOS) public SlowLogResponses getSlowLogResponses(final RpcController controller, final SlowLogResponseRequest request) { - final SlowLogRecorder slowLogRecorder = - this.regionServer.getSlowLogRecorder(); - final List<SlowLogPayload> slowLogPayloads; - slowLogPayloads = slowLogRecorder != null - ? slowLogRecorder.getSlowLogPayloads(request) - : Collections.emptyList(); + final NamedQueueRecorder namedQueueRecorder = + this.regionServer.getNamedQueueRecorder(); + final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder); SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder() .addAllSlowLogPayloads(slowLogPayloads) .build(); return slowLogResponses; } + private List<SlowLogPayload> getSlowLogPayloads(SlowLogResponseRequest request, + NamedQueueRecorder namedQueueRecorder) { + if (namedQueueRecorder == null) { + return Collections.emptyList(); + } + List<SlowLogPayload> slowLogPayloads; + NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); + namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); + namedQueueGetRequest.setSlowLogResponseRequest(request); + NamedQueueGetResponse namedQueueGetResponse = + namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); + slowLogPayloads = namedQueueGetResponse != null ? + namedQueueGetResponse.getSlowLogPayloads() : + Collections.emptyList(); + return slowLogPayloads; + } + @Override @QosPriority(priority = HConstants.ADMIN_QOS) public SlowLogResponses getLargeLogResponses(final RpcController controller, final SlowLogResponseRequest request) { - final SlowLogRecorder slowLogRecorder = - this.regionServer.getSlowLogRecorder(); - final List<SlowLogPayload> slowLogPayloads; - slowLogPayloads = slowLogRecorder != null - ? slowLogRecorder.getLargeLogPayloads(request) - : Collections.emptyList(); + final NamedQueueRecorder namedQueueRecorder = + this.regionServer.getNamedQueueRecorder(); + final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder); SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder() .addAllSlowLogPayloads(slowLogPayloads) .build(); @@ -4103,10 +4117,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, @QosPriority(priority = HConstants.ADMIN_QOS) public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller, final ClearSlowLogResponseRequest request) { - final SlowLogRecorder slowLogRecorder = - this.regionServer.getSlowLogRecorder(); - boolean slowLogsCleaned = Optional.ofNullable(slowLogRecorder) - .map(SlowLogRecorder::clearSlowLogPayloads).orElse(false); + final NamedQueueRecorder namedQueueRecorder = + this.regionServer.getNamedQueueRecorder(); + boolean slowLogsCleaned = Optional.ofNullable(namedQueueRecorder) + .map(queueRecorder -> + queueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG)) + .orElse(false); ClearSlowLogResponses clearSlowLogResponses = ClearSlowLogResponses.newBuilder() .setIsCleaned(slowLogsCleaned) .build(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java deleted file mode 100644 index 9c147e3..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver.slowlog; - -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.RingBuffer; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.SlowLogParams; -import org.apache.hadoop.hbase.ipc.RpcCall; -import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; -import org.apache.hbase.thirdparty.com.google.common.collect.Queues; -import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; -import org.apache.hbase.thirdparty.com.google.protobuf.Message; - -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; - -/** - * Event Handler run by disruptor ringbuffer consumer - */ -@InterfaceAudience.Private -class LogEventHandler implements EventHandler<RingBufferEnvelope> { - - private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class); - - private static final String SYS_TABLE_QUEUE_SIZE = - "hbase.regionserver.slowlog.systable.queue.size"; - private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000; - private static final int SYSTABLE_PUT_BATCH_SIZE = 100; - - private final Queue<SlowLogPayload> queueForRingBuffer; - private final Queue<SlowLogPayload> queueForSysTable; - private final boolean isSlowLogTableEnabled; - - private Configuration configuration; - - private static final ReentrantLock LOCK = new ReentrantLock(); - - LogEventHandler(int eventCount, boolean isSlowLogTableEnabled, Configuration conf) { - this.configuration = conf; - EvictingQueue<SlowLogPayload> evictingQueue = EvictingQueue.create(eventCount); - queueForRingBuffer = Queues.synchronizedQueue(evictingQueue); - this.isSlowLogTableEnabled = isSlowLogTableEnabled; - if (isSlowLogTableEnabled) { - int sysTableQueueSize = conf.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE); - EvictingQueue<SlowLogPayload> evictingQueueForTable = - EvictingQueue.create(sysTableQueueSize); - queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable); - } else { - queueForSysTable = null; - } - } - - /** - * Called when a publisher has published an event to the {@link RingBuffer} - * - * @param event published to the {@link RingBuffer} - * @param sequence of the event being processed - * @param endOfBatch flag to indicate if this is the last event in a batch from - * the {@link RingBuffer} - * @throws Exception if the EventHandler would like the exception handled further up the chain - */ - @Override - public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch) - throws Exception { - final RpcLogDetails rpcCallDetails = event.getPayload(); - final RpcCall rpcCall = rpcCallDetails.getRpcCall(); - final String clientAddress = rpcCallDetails.getClientAddress(); - final long responseSize = rpcCallDetails.getResponseSize(); - final String className = rpcCallDetails.getClassName(); - final SlowLogPayload.Type type = getLogType(rpcCallDetails); - if (type == null) { - return; - } - Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod(); - Message param = rpcCallDetails.getParam(); - long receiveTime = rpcCall.getReceiveTime(); - long startTime = rpcCall.getStartTime(); - long endTime = System.currentTimeMillis(); - int processingTime = (int) (endTime - startTime); - int qTime = (int) (startTime - receiveTime); - final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param); - int numGets = 0; - int numMutations = 0; - int numServiceCalls = 0; - if (param instanceof ClientProtos.MultiRequest) { - ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param; - for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) { - for (ClientProtos.Action action : regionAction.getActionList()) { - if (action.hasMutation()) { - numMutations++; - } - if (action.hasGet()) { - numGets++; - } - if (action.hasServiceCall()) { - numServiceCalls++; - } - } - } - } - final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY); - final String methodDescriptorName = - methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY; - SlowLogPayload slowLogPayload = SlowLogPayload.newBuilder() - .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")") - .setClientAddress(clientAddress) - .setMethodName(methodDescriptorName) - .setMultiGets(numGets) - .setMultiMutations(numMutations) - .setMultiServiceCalls(numServiceCalls) - .setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY) - .setProcessingTime(processingTime) - .setQueueTime(qTime) - .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY) - .setResponseSize(responseSize) - .setServerClass(className) - .setStartTime(startTime) - .setType(type) - .setUserName(userName) - .build(); - queueForRingBuffer.add(slowLogPayload); - if (isSlowLogTableEnabled) { - if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) { - queueForSysTable.add(slowLogPayload); - } - } - } - - private SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) { - final boolean isSlowLog = rpcCallDetails.isSlowLog(); - final boolean isLargeLog = rpcCallDetails.isLargeLog(); - final SlowLogPayload.Type type; - if (!isSlowLog && !isLargeLog) { - LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}", - rpcCallDetails); - return null; - } - if (isSlowLog && isLargeLog) { - type = SlowLogPayload.Type.ALL; - } else if (isSlowLog) { - type = SlowLogPayload.Type.SLOW_LOG; - } else { - type = SlowLogPayload.Type.LARGE_LOG; - } - return type; - } - - /** - * Cleans up slow log payloads - * - * @return true if slow log payloads are cleaned up, false otherwise - */ - boolean clearSlowLogs() { - if (LOG.isDebugEnabled()) { - LOG.debug("Received request to clean up online slowlog buffer.."); - } - queueForRingBuffer.clear(); - return true; - } - - /** - * Retrieve list of slow log payloads - * - * @param request slow log request parameters - * @return list of slow log payloads - */ - List<SlowLogPayload> getSlowLogPayloads(final AdminProtos.SlowLogResponseRequest request) { - List<SlowLogPayload> slowLogPayloadList = - Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0])) - .filter(e -> e.getType() == SlowLogPayload.Type.ALL - || e.getType() == SlowLogPayload.Type.SLOW_LOG) - .collect(Collectors.toList()); - - // latest slow logs first, operator is interested in latest records from in-memory buffer - Collections.reverse(slowLogPayloadList); - - return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList); - } - - /** - * Retrieve list of large log payloads - * - * @param request large log request parameters - * @return list of large log payloads - */ - List<SlowLogPayload> getLargeLogPayloads(final AdminProtos.SlowLogResponseRequest request) { - List<SlowLogPayload> slowLogPayloadList = - Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0])) - .filter(e -> e.getType() == SlowLogPayload.Type.ALL - || e.getType() == SlowLogPayload.Type.LARGE_LOG) - .collect(Collectors.toList()); - - // latest large logs first, operator is interested in latest records from in-memory buffer - Collections.reverse(slowLogPayloadList); - - return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList); - } - - /** - * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch - */ - void addAllLogsToSysTable() { - if (queueForSysTable == null) { - // hbase.regionserver.slowlog.systable.enabled is turned off. Exiting. - return; - } - if (LOCK.isLocked()) { - return; - } - LOCK.lock(); - try { - List<SlowLogPayload> slowLogPayloads = new ArrayList<>(); - int i = 0; - while (!queueForSysTable.isEmpty()) { - slowLogPayloads.add(queueForSysTable.poll()); - i++; - if (i == SYSTABLE_PUT_BATCH_SIZE) { - SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); - slowLogPayloads.clear(); - i = 0; - } - } - if (slowLogPayloads.size() > 0) { - SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); - } - } finally { - LOCK.unlock(); - } - } - -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java similarity index 76% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java index f90bbc0..542efc3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java @@ -17,12 +17,14 @@ * limitations under the License. */ -package org.apache.hadoop.hbase.regionserver.slowlog; +package org.apache.hadoop.hbase.namequeues; import java.io.IOException; +import java.lang.reflect.Constructor; import java.net.InetAddress; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -35,6 +37,8 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ipc.RpcCall; import org.apache.hadoop.hbase.ipc.RpcCallback; +import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; +import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -61,27 +65,25 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPaylo * Tests for Online SlowLog Provider Service */ @Category({MasterTests.class, MediumTests.class}) -public class TestSlowLogRecorder { +public class TestNamedQueueRecorder { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestSlowLogRecorder.class); + HBaseClassTestRule.forClass(TestNamedQueueRecorder.class); - private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class); + private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class); private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility(); - private SlowLogRecorder slowLogRecorder; + private NamedQueueRecorder namedQueueRecorder; private static int i = 0; private static Configuration applySlowLogRecorderConf(int eventSize) { - Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize); return conf; - } /** @@ -90,11 +92,10 @@ public class TestSlowLogRecorder { * * @param i index of ringbuffer logs * @param j data value that was put on index i - * @param slowLogPayloads list of payload retrieved from {@link SlowLogRecorder} + * @param slowLogPayloads list of payload retrieved from {@link NamedQueueRecorder} * @return if actual values are as per expectations */ private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) { - boolean isClientExpected = slowLogPayloads.get(i).getClientAddress().equals("client_" + j); boolean isUserExpected = slowLogPayloads.get(i).getUserName().equals("userName_" + j); boolean isClassExpected = slowLogPayloads.get(i).getServerClass().equals("class_" + j); @@ -102,15 +103,18 @@ public class TestSlowLogRecorder { } @Test - public void testOnlieSlowLogConsumption() throws Exception { + public void testOnlieSlowLogConsumption() throws Exception{ Configuration conf = applySlowLogRecorderConf(8); - slowLogRecorder = new SlowLogRecorder(conf); + Constructor<NamedQueueRecorder> constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); - slowLogRecorder.clearSlowLogPayloads(); - Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); + namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); LOG.debug("Initially ringbuffer of Slow Log records is empty"); int i = 0; @@ -119,12 +123,12 @@ public class TestSlowLogRecorder { for (; i < 5; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - slowLogRecorder.addSlowLogPayload(rpcLogDetails); + namedQueueRecorder.addRecord(rpcLogDetails); } - Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> slowLogRecorder.getSlowLogPayloads(request).size() == 5)); - List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); + Assert.assertNotEquals(-1, + HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 5)); + List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); Assert.assertTrue(confirmPayloadParams(0, 5, slowLogPayloads)); Assert.assertTrue(confirmPayloadParams(1, 4, slowLogPayloads)); Assert.assertTrue(confirmPayloadParams(2, 3, slowLogPayloads)); @@ -135,15 +139,15 @@ public class TestSlowLogRecorder { for (; i < 7; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - slowLogRecorder.addSlowLogPayload(rpcLogDetails); + namedQueueRecorder.addRecord(rpcLogDetails); } Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> slowLogRecorder.getSlowLogPayloads(request).size() == 7)); + () -> getSlowLogPayloads(request).size() == 7)); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request); + List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); return slowLogPayloadsList.size() == 7 && confirmPayloadParams(0, 7, slowLogPayloadsList) && confirmPayloadParams(5, 2, slowLogPayloadsList) @@ -155,15 +159,15 @@ public class TestSlowLogRecorder { for (; i < 10; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - slowLogRecorder.addSlowLogPayload(rpcLogDetails); + namedQueueRecorder.addRecord(rpcLogDetails); } Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> slowLogRecorder.getSlowLogPayloads(request).size() == 8)); + () -> getSlowLogPayloads(request).size() == 8)); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request); + List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); // confirm ringbuffer is full return slowLogPayloadsList.size() == 8 && confirmPayloadParams(7, 3, slowLogPayloadsList) @@ -176,15 +180,15 @@ public class TestSlowLogRecorder { for (; i < 14; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - slowLogRecorder.addSlowLogPayload(rpcLogDetails); + namedQueueRecorder.addRecord(rpcLogDetails); } Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> slowLogRecorder.getSlowLogPayloads(request).size() == 8)); + () -> getSlowLogPayloads(request).size() == 8)); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request); + List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); // confirm ringbuffer is full // and ordered events return slowLogPayloadsList.size() == 8 @@ -195,9 +199,14 @@ public class TestSlowLogRecorder { }) ); + AdminProtos.SlowLogResponseRequest largeLogRequest = + AdminProtos.SlowLogResponseRequest.newBuilder() + .setLimit(15) + .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG) + .build(); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getLargeLogPayloads(request); + List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(largeLogRequest); // confirm ringbuffer is full // and ordered events return slowLogPayloadsList.size() == 8 @@ -210,11 +219,12 @@ public class TestSlowLogRecorder { Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads(); + boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue( + NamedQueuePayload.NamedQueueEvent.SLOW_LOG); LOG.debug("cleared the ringbuffer of Online Slow Log records"); - List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request); + List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); // confirm ringbuffer is empty return slowLogPayloadsList.size() == 0 && isRingBufferCleaned; }) @@ -222,30 +232,43 @@ public class TestSlowLogRecorder { } + private List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) { + NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); + namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); + namedQueueGetRequest.setSlowLogResponseRequest(request); + NamedQueueGetResponse namedQueueGetResponse = + namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); + return namedQueueGetResponse == null ? + Collections.emptyList() : namedQueueGetResponse.getSlowLogPayloads(); + } + @Test public void testOnlineSlowLogWithHighRecords() throws Exception { Configuration conf = applySlowLogRecorderConf(14); - slowLogRecorder = new SlowLogRecorder(conf); + Constructor<NamedQueueRecorder> constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build(); - Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); LOG.debug("Initially ringbuffer of Slow Log records is empty"); for (int i = 0; i < 14 * 11; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - slowLogRecorder.addSlowLogPayload(rpcLogDetails); + namedQueueRecorder.addRecord(rpcLogDetails); } LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records"); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14)); + () -> getSlowLogPayloads(request).size() == 14)); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); + List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); // confirm strict order of slow log payloads return slowLogPayloads.size() == 14 @@ -266,37 +289,37 @@ public class TestSlowLogRecorder { }) ); - boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads(); + boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue( + NamedQueuePayload.NamedQueueEvent.SLOW_LOG); Assert.assertTrue(isRingBufferCleaned); LOG.debug("cleared the ringbuffer of Online Slow Log records"); - List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); + List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); // confirm ringbuffer is empty Assert.assertEquals(slowLogPayloads.size(), 0); - } @Test public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception { - Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY); - slowLogRecorder = new SlowLogRecorder(conf); + Constructor<NamedQueueRecorder> constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder().build(); - Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); LOG.debug("Initially ringbuffer of Slow Log records is empty"); - for (int i = 0; i < 300; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - slowLogRecorder.addSlowLogPayload(rpcLogDetails); + namedQueueRecorder.addRecord(rpcLogDetails); } - Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); + List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); return slowLogPayloads.size() == 0; }) ); @@ -305,56 +328,58 @@ public class TestSlowLogRecorder { @Test public void testOnlineSlowLogWithDisableConfig() throws Exception { - Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false); + Constructor<NamedQueueRecorder> constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); - slowLogRecorder = new SlowLogRecorder(conf); AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder().build(); - Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); LOG.debug("Initially ringbuffer of Slow Log records is empty"); - for (int i = 0; i < 300; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - slowLogRecorder.addSlowLogPayload(rpcLogDetails); + namedQueueRecorder.addRecord(rpcLogDetails); } - Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); + List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); return slowLogPayloads.size() == 0; }) ); conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); - } @Test public void testSlowLogFilters() throws Exception { Configuration conf = applySlowLogRecorderConf(30); - slowLogRecorder = new SlowLogRecorder(conf); + Constructor<NamedQueueRecorder> constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder() .setLimit(15) .setUserName("userName_87") .build(); - Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); LOG.debug("Initially ringbuffer of Slow Log records is empty"); for (int i = 0; i < 100; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - slowLogRecorder.addSlowLogPayload(rpcLogDetails); + namedQueueRecorder.addRecord(rpcLogDetails); } LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter"); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> slowLogRecorder.getSlowLogPayloads(request).size() == 1)); + () -> getSlowLogPayloads(request).size() == 1)); AdminProtos.SlowLogResponseRequest requestClient = AdminProtos.SlowLogResponseRequest.newBuilder() @@ -362,25 +387,32 @@ public class TestSlowLogRecorder { .setClientAddress("client_85") .build(); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> slowLogRecorder.getSlowLogPayloads(requestClient).size() == 1)); + () -> getSlowLogPayloads(requestClient).size() == 1)); AdminProtos.SlowLogResponseRequest requestSlowLog = AdminProtos.SlowLogResponseRequest.newBuilder() .setLimit(15) .build(); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15)); - + () -> getSlowLogPayloads(requestSlowLog).size() == 15)); } @Test public void testConcurrentSlowLogEvents() throws Exception { Configuration conf = applySlowLogRecorderConf(50000); - slowLogRecorder = new SlowLogRecorder(conf); + Constructor<NamedQueueRecorder> constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build(); - Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); + AdminProtos.SlowLogResponseRequest largeLogRequest = + AdminProtos.SlowLogResponseRequest.newBuilder() + .setLimit(500000) + .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG) + .build(); + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); LOG.debug("Initially ringbuffer of Slow Log records is empty"); for (int j = 0; j < 1000; j++) { @@ -389,7 +421,7 @@ public class TestSlowLogRecorder { for (int i = 0; i < 3500; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - slowLogRecorder.addSlowLogPayload(rpcLogDetails); + namedQueueRecorder.addRecord(rpcLogDetails); } }); @@ -397,22 +429,24 @@ public class TestSlowLogRecorder { Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - slowLogRecorder.clearSlowLogPayloads(); - Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor( - 5000, () -> slowLogRecorder.getSlowLogPayloads(request).size() > 10000)); + 5000, () -> getSlowLogPayloads(request).size() > 10000)); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor( - 5000, () -> slowLogRecorder.getLargeLogPayloads(request).size() > 10000)); + 5000, () -> getSlowLogPayloads(largeLogRequest).size() > 10000)); } @Test public void testSlowLargeLogEvents() throws Exception { Configuration conf = applySlowLogRecorderConf(28); - slowLogRecorder = new SlowLogRecorder(conf); + Constructor<NamedQueueRecorder> constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); + AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build(); - Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); LOG.debug("Initially ringbuffer of Slow Log records is empty"); boolean isSlowLog; @@ -428,16 +462,16 @@ public class TestSlowLogRecorder { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1), isSlowLog, isLargeLog); - slowLogRecorder.addSlowLogPayload(rpcLogDetails); + namedQueueRecorder.addRecord(rpcLogDetails); } LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records"); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14)); + () -> getSlowLogPayloads(request).size() == 14)); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); + List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); // confirm strict order of slow log payloads return slowLogPayloads.size() == 14 @@ -458,12 +492,18 @@ public class TestSlowLogRecorder { }) ); + AdminProtos.SlowLogResponseRequest largeLogRequest = + AdminProtos.SlowLogResponseRequest.newBuilder() + .setLimit(14 * 11) + .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG) + .build(); + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> slowLogRecorder.getLargeLogPayloads(request).size() == 14)); + () -> getSlowLogPayloads(largeLogRequest).size() == 14)); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - List<SlowLogPayload> largeLogPayloads = slowLogRecorder.getLargeLogPayloads(request); + List<SlowLogPayload> largeLogPayloads = getSlowLogPayloads(largeLogRequest); // confirm strict order of slow log payloads return largeLogPayloads.size() == 14 @@ -483,14 +523,16 @@ public class TestSlowLogRecorder { && confirmPayloadParams(13, 128, largeLogPayloads); }) ); - } @Test public void testSlowLogMixedFilters() throws Exception { Configuration conf = applySlowLogRecorderConf(30); - slowLogRecorder = new SlowLogRecorder(conf); + Constructor<NamedQueueRecorder> constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder() .setLimit(15) @@ -498,23 +540,23 @@ public class TestSlowLogRecorder { .setClientAddress("client_88") .build(); - Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); for (int i = 0; i < 100; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - slowLogRecorder.addSlowLogPayload(rpcLogDetails); + namedQueueRecorder.addRecord(rpcLogDetails); } Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> slowLogRecorder.getSlowLogPayloads(request).size() == 2)); + () -> getSlowLogPayloads(request).size() == 2)); AdminProtos.SlowLogResponseRequest request2 = AdminProtos.SlowLogResponseRequest.newBuilder() .setLimit(15) .setUserName("userName_1") .setClientAddress("client_2") .build(); - Assert.assertEquals(0, slowLogRecorder.getSlowLogPayloads(request2).size()); + Assert.assertEquals(0, getSlowLogPayloads(request2).size()); AdminProtos.SlowLogResponseRequest request3 = AdminProtos.SlowLogResponseRequest.newBuilder() @@ -523,7 +565,7 @@ public class TestSlowLogRecorder { .setClientAddress("client_88") .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND) .build(); - Assert.assertEquals(0, slowLogRecorder.getSlowLogPayloads(request3).size()); + Assert.assertEquals(0, getSlowLogPayloads(request3).size()); AdminProtos.SlowLogResponseRequest request4 = AdminProtos.SlowLogResponseRequest.newBuilder() @@ -532,7 +574,7 @@ public class TestSlowLogRecorder { .setClientAddress("client_87") .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND) .build(); - Assert.assertEquals(1, slowLogRecorder.getSlowLogPayloads(request4).size()); + Assert.assertEquals(1, getSlowLogPayloads(request4).size()); AdminProtos.SlowLogResponseRequest request5 = AdminProtos.SlowLogResponseRequest.newBuilder() @@ -541,14 +583,14 @@ public class TestSlowLogRecorder { .setClientAddress("client_89") .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR) .build(); - Assert.assertEquals(2, slowLogRecorder.getSlowLogPayloads(request5).size()); + Assert.assertEquals(2, getSlowLogPayloads(request5).size()); AdminProtos.SlowLogResponseRequest requestSlowLog = AdminProtos.SlowLogResponseRequest.newBuilder() .setLimit(15) .build(); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15)); + () -> getSlowLogPayloads(requestSlowLog).size() == 15)); } static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) { @@ -762,26 +804,20 @@ public class TestSlowLogRecorder { private static Optional<User> getUser(String userName) { return Optional.of(new User() { - - @Override public String getShortName() { return userName; } - @Override public <T> T runAs(PrivilegedAction<T> action) { return null; } - @Override - public <T> T runAs(PrivilegedExceptionAction<T> action) throws - IOException, InterruptedException { + public <T> T runAs(PrivilegedExceptionAction<T> action) { return null; } - }); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java similarity index 76% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java index e08ad29..a914796 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java @@ -17,10 +17,11 @@ * limitations under the License. */ -package org.apache.hadoop.hbase.regionserver.slowlog; +package org.apache.hadoop.hbase.namequeues; import java.io.IOException; import java.lang.reflect.Field; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; @@ -33,8 +34,11 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; +import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -59,11 +63,11 @@ public class TestSlowLogAccessor { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestSlowLogAccessor.class); - private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class); + private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class); private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility(); - private SlowLogRecorder slowLogRecorder; + private NamedQueueRecorder namedQueueRecorder; @BeforeClass public static void setup() throws Exception { @@ -88,9 +92,19 @@ public class TestSlowLogAccessor { @Before public void setUp() throws Exception { HRegionServer hRegionServer = HBASE_TESTING_UTILITY.getMiniHBaseCluster().getRegionServer(0); - Field slowLogRecorder = HRegionServer.class.getDeclaredField("slowLogRecorder"); + Field slowLogRecorder = HRegionServer.class.getDeclaredField("namedQueueRecorder"); slowLogRecorder.setAccessible(true); - this.slowLogRecorder = (SlowLogRecorder) slowLogRecorder.get(hRegionServer); + this.namedQueueRecorder = (NamedQueueRecorder) slowLogRecorder.get(hRegionServer); + } + + private List<TooSlowLog.SlowLogPayload> getSlowLogPayloads( + AdminProtos.SlowLogResponseRequest request) { + NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); + namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); + namedQueueGetRequest.setSlowLogResponseRequest(request); + NamedQueueGetResponse namedQueueGetResponse = + namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); + return namedQueueGetResponse.getSlowLogPayloads(); } @Test @@ -99,42 +113,42 @@ public class TestSlowLogAccessor { AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); - slowLogRecorder.clearSlowLogPayloads(); - Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); + namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); int i = 0; Connection connection = waitForSlowLogTableCreation(); // add 5 records initially for (; i < 5; i++) { - RpcLogDetails rpcLogDetails = TestSlowLogRecorder + RpcLogDetails rpcLogDetails = TestNamedQueueRecorder .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - slowLogRecorder.addSlowLogPayload(rpcLogDetails); + namedQueueRecorder.addRecord(rpcLogDetails); } // add 2 more records for (; i < 7; i++) { - RpcLogDetails rpcLogDetails = TestSlowLogRecorder + RpcLogDetails rpcLogDetails = TestNamedQueueRecorder .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - slowLogRecorder.addSlowLogPayload(rpcLogDetails); + namedQueueRecorder.addRecord(rpcLogDetails); } // add 3 more records for (; i < 10; i++) { - RpcLogDetails rpcLogDetails = TestSlowLogRecorder + RpcLogDetails rpcLogDetails = TestNamedQueueRecorder .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - slowLogRecorder.addSlowLogPayload(rpcLogDetails); + namedQueueRecorder.addRecord(rpcLogDetails); } // add 4 more records for (; i < 14; i++) { - RpcLogDetails rpcLogDetails = TestSlowLogRecorder + RpcLogDetails rpcLogDetails = TestNamedQueueRecorder .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - slowLogRecorder.addSlowLogPayload(rpcLogDetails); + namedQueueRecorder.addRecord(rpcLogDetails); } Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY - .waitFor(3000, () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14)); + .waitFor(3000, () -> getSlowLogPayloads(request).size() == 14)); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> getTableCount(connection) == 14)); @@ -170,10 +184,10 @@ public class TestSlowLogAccessor { public void testHigherSlowLogs() throws Exception { Connection connection = waitForSlowLogTableCreation(); - slowLogRecorder.clearSlowLogPayloads(); + namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build(); - Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); for (int j = 0; j < 100; j++) { CompletableFuture.runAsync(() -> { @@ -181,15 +195,15 @@ public class TestSlowLogAccessor { if (i == 300) { Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); } - RpcLogDetails rpcLogDetails = TestSlowLogRecorder + RpcLogDetails rpcLogDetails = TestNamedQueueRecorder .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - slowLogRecorder.addSlowLogPayload(rpcLogDetails); + namedQueueRecorder.addRecord(rpcLogDetails); } }); } Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(7000, () -> { - int count = slowLogRecorder.getSlowLogPayloads(request).size(); + int count = getSlowLogPayloads(request).size(); LOG.debug("RingBuffer records count: {}", count); return count > 2000; })); diff --git a/hbase-shell/src/test/ruby/hbase/admin_test.rb b/hbase-shell/src/test/ruby/hbase/admin_test.rb index 3a0cb90..11798c3 100644 --- a/hbase-shell/src/test/ruby/hbase/admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/admin_test.rb @@ -404,7 +404,7 @@ module Hbase define_test 'clear slowlog responses should work' do output = capture_stdout { command(:clear_slowlog_responses, nil) } - assert(output.include?('Cleared Slowlog responses from 1/1 RegionServers')) + assert(output.include?('Cleared Slowlog responses from 0/1 RegionServers')) end #------------------------------------------------------------------------------- diff --git a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java index b1c71f8..b853431 100644 --- a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java +++ b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java @@ -200,6 +200,7 @@ public class TestThriftHBaseServiceHandler { @BeforeClass public static void beforeClass() throws Exception { UTIL.getConfiguration().set("hbase.client.retries.number", "3"); + UTIL.getConfiguration().setBoolean("hbase.regionserver.slowlog.buffer.enabled", true); UTIL.startMiniCluster(); TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor = new TableDescriptorBuilder.ModifyableTableDescriptor(TableName.valueOf(tableAname));