This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push: new f2c087c Revert "HBASE-24718 : Generic NamedQueue framework for multiple use-cases (Refactor SlowLog responses)" f2c087c is described below commit f2c087caeb232ac259d0eb7569a9e9c97c3d9336 Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Tue Jul 21 13:50:27 2020 +0530 Revert "HBASE-24718 : Generic NamedQueue framework for multiple use-cases (Refactor SlowLog responses)" This reverts commit 16f306b4a17e0f3ab8e3381bde9b2c4c7f742ef9. TestAdminShell and TestThriftHBaseServiceHandler are failing consistently --- .../hbase/shaded/protobuf/RequestConverter.java | 5 - hbase-common/src/main/resources/hbase-default.xml | 12 - .../src/main/protobuf/server/region/Admin.proto | 6 - .../org/apache/hadoop/hbase/ipc/RpcServer.java | 28 ++- .../hadoop/hbase/ipc/RpcServerInterface.java | 10 +- .../hadoop/hbase/namequeues/LogEventHandler.java | 130 ---------- .../hadoop/hbase/namequeues/NamedQueuePayload.java | 49 ---- .../hadoop/hbase/namequeues/NamedQueueService.java | 69 ------ .../hbase/namequeues/SlowLogPersistentService.java | 98 -------- .../hbase/namequeues/impl/SlowLogQueueService.java | 264 -------------------- .../namequeues/request/NamedQueueGetRequest.java | 65 ----- .../namequeues/response/NamedQueueGetResponse.java | 61 ----- .../hadoop/hbase/regionserver/HRegionServer.java | 23 +- .../hadoop/hbase/regionserver/RSRpcServices.java | 52 ++-- .../slowlog}/DisruptorExceptionHandler.java | 2 +- .../regionserver/slowlog/LogEventHandler.java | 266 +++++++++++++++++++++ .../slowlog}/LogHandlerUtils.java | 6 +- .../slowlog}/RingBufferEnvelope.java | 22 +- .../slowlog}/RpcLogDetails.java | 5 +- .../slowlog/SlowLogRecorder.java} | 120 ++++++---- .../slowlog}/SlowLogTableOpsChore.java | 12 +- .../slowlog}/TestSlowLogAccessor.java | 56 ++--- .../slowlog/TestSlowLogRecorder.java} | 216 +++++++---------- 23 files changed, 519 insertions(+), 1058 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 7b0282a..2be7ccd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -1817,11 +1817,6 @@ public final class RequestConverter { } else { builder.setFilterByOperator(SlowLogResponseRequest.FilterByOperator.OR); } - if (LogQueryFilter.Type.SLOW_LOG.equals(logQueryFilter.getType())) { - builder.setLogType(SlowLogResponseRequest.LogType.SLOW_LOG); - } else { - builder.setLogType(SlowLogResponseRequest.LogType.LARGE_LOG); - } return builder.setLimit(logQueryFilter.getLimit()).build(); } diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 982e243..23b12bb 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -1990,16 +1990,4 @@ possible configurations would overwhelm and obscure the important. too large batch request. </description> </property> - <property> - <name>hbase.namedqueue.provider.classes</name> - <value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService</value> - <description> - Default values for NamedQueueService implementors. This comma separated full class names - represent all implementors of NamedQueueService that we would like to be invoked by - LogEvent handler service. One example of NamedQueue service is SlowLogQueueService which - is used to store slow/large RPC logs in ringbuffer at each RegionServer. - All implementors of NamedQueueService should be found under package: - "org.apache.hadoop.hbase.namequeues.impl" - </description> - </property> </configuration> diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto index 101ed1e..b8cfcde 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto @@ -288,18 +288,12 @@ message SlowLogResponseRequest { OR = 1; } - enum LogType { - SLOW_LOG = 0; - LARGE_LOG = 1; - } - optional string region_name = 1; optional string table_name = 2; optional string client_address = 3; optional string user_name = 4; optional uint32 limit = 5 [default = 10]; optional FilterByOperator filter_by_operator = 6 [default = OR]; - optional LogType log_type = 7; } message SlowLogResponses { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index cace5f0..12fd584 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -39,7 +39,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.RequestTooBigException; @@ -47,8 +46,8 @@ import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.regionserver.RSRpcServices; -import org.apache.hadoop.hbase.namequeues.RpcLogDetails; -import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; +import org.apache.hadoop.hbase.regionserver.slowlog.RpcLogDetails; +import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder; import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; @@ -95,9 +94,10 @@ public abstract class RpcServer implements RpcServerInterface, private static final String MULTI_GETS = "multi.gets"; private static final String MULTI_MUTATIONS = "multi.mutations"; private static final String MULTI_SERVICE_CALLS = "multi.service_calls"; + private static final String GET_SLOW_LOG_RESPONSES = "GetSlowLogResponses"; + private static final String CLEAR_SLOW_LOGS_RESPONSES = "ClearSlowLogsResponses"; private final boolean authorize; - private final boolean isOnlineLogProviderEnabled; protected boolean isSecurityEnabled; public static final byte CURRENT_VERSION = 0; @@ -229,7 +229,7 @@ public abstract class RpcServer implements RpcServerInterface, /** * Use to add online slowlog responses */ - private NamedQueueRecorder namedQueueRecorder; + private SlowLogRecorder slowLogRecorder; @FunctionalInterface protected interface CallCleanup { @@ -304,8 +304,6 @@ public abstract class RpcServer implements RpcServerInterface, saslProps = Collections.emptyMap(); } - this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, - HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); this.scheduler = scheduler; } @@ -434,11 +432,11 @@ public abstract class RpcServer implements RpcServerInterface, tooLarge, tooSlow, status.getClient(), startTime, processingTime, qTime, responseSize, userName); - if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) { + if (this.slowLogRecorder != null) { // send logs to ring buffer owned by slowLogRecorder - final String className = - server == null ? StringUtils.EMPTY : server.getClass().getSimpleName(); - this.namedQueueRecorder.addRecord( + final String className = server == null ? StringUtils.EMPTY : + server.getClass().getSimpleName(); + this.slowLogRecorder.addSlowLogPayload( new RpcLogDetails(call, param, status.getClient(), responseSize, className, tooSlow, tooLarge)); } @@ -821,8 +819,12 @@ public abstract class RpcServer implements RpcServerInterface, } @Override - public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) { - this.namedQueueRecorder = namedQueueRecorder; + public void setSlowLogRecorder(SlowLogRecorder slowLogRecorder) { + this.slowLogRecorder = slowLogRecorder; } + @Override + public SlowLogRecorder getSlowLogRecorder() { + return slowLogRecorder; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java index 99e0188..c8a71f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.regionserver.RSRpcServices; -import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; +import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.yetus.audience.InterfaceAudience; @@ -102,8 +102,12 @@ public interface RpcServerInterface { /** * Set Online SlowLog Provider * - * @param namedQueueRecorder instance of {@link NamedQueueRecorder} + * @param slowLogRecorder instance of {@link SlowLogRecorder} */ - void setNamedQueueRecorder(final NamedQueueRecorder namedQueueRecorder); + void setSlowLogRecorder(final SlowLogRecorder slowLogRecorder); + /** + * @return Retrieve instance of {@link SlowLogRecorder} maintained by RpcServer + */ + SlowLogRecorder getSlowLogRecorder(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java deleted file mode 100644 index 8b8db2f..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.namequeues; - -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.RingBuffer; - -import java.lang.reflect.InvocationTargetException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; -import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Event Handler run by disruptor ringbuffer consumer. - * Although this is generic implementation for namedQueue, it can have individual queue specific - * logic. - */ -@InterfaceAudience.Private -class LogEventHandler implements EventHandler<RingBufferEnvelope> { - - private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class); - - // Map that binds namedQueues to corresponding queue service implementation. - // If NamedQueue of specific type is enabled, corresponding service will be used to - // insert and retrieve records. - // Individual queue sizes should be determined based on their individual configs within - // each service. - private final Map<NamedQueuePayload.NamedQueueEvent, NamedQueueService> namedQueueServices = - new HashMap<>(); - - private static final String NAMED_QUEUE_PROVIDER_CLASSES = "hbase.namedqueue.provider.classes"; - - LogEventHandler(final Configuration conf) { - for (String implName : conf.getStringCollection(NAMED_QUEUE_PROVIDER_CLASSES)) { - Class<?> clz; - try { - clz = Class.forName(implName); - } catch (ClassNotFoundException e) { - LOG.warn("Failed to find NamedQueueService implementor class {}", implName, e); - continue; - } - - if (!NamedQueueService.class.isAssignableFrom(clz)) { - LOG.warn("Class {} is not implementor of NamedQueueService.", clz); - continue; - } - - // add all service mappings here - try { - NamedQueueService namedQueueService = - (NamedQueueService) clz.getConstructor(Configuration.class).newInstance(conf); - namedQueueServices.put(namedQueueService.getEvent(), namedQueueService); - } catch (InstantiationException | IllegalAccessException | NoSuchMethodException - | InvocationTargetException e) { - LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.", - clz); - } - } - } - - /** - * Called when a publisher has published an event to the {@link RingBuffer}. - * This is generic consumer of disruptor ringbuffer and for each new namedQueue that we - * add, we should also provide specific consumer logic here. - * - * @param event published to the {@link RingBuffer} - * @param sequence of the event being processed - * @param endOfBatch flag to indicate if this is the last event in a batch from - * the {@link RingBuffer} - */ - @Override - public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch) { - final NamedQueuePayload namedQueuePayload = event.getPayload(); - // consume ringbuffer payload based on event type - namedQueueServices.get(namedQueuePayload.getNamedQueueEvent()) - .consumeEventFromDisruptor(namedQueuePayload); - } - - /** - * Cleans up queues maintained by services. - * - * @param namedQueueEvent type of queue to clear - * @return true if queue is cleaned up, false otherwise - */ - boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { - return namedQueueServices.get(namedQueueEvent).clearNamedQueue(); - } - - /** - * Add all in memory queue records to system table. The implementors can use system table - * or direct HDFS file or ZK as persistence system. - */ - void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { - namedQueueServices.get(namedQueueEvent).persistAll(); - } - - /** - * Retrieve in memory queue records from ringbuffer - * - * @param request namedQueue request with event type - * @return queue records from ringbuffer after filter (if applied) - */ - NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { - return namedQueueServices.get(request.getNamedQueueEvent()).getNamedQueueRecords(request); - } - -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java deleted file mode 100644 index 7aa87fa..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.namequeues; - -import org.apache.yetus.audience.InterfaceAudience; - -/** - * Base payload to be prepared by client to send various namedQueue events for in-memory - * ring buffer storage in either HMaster or RegionServer. - * e.g slowLog responses - */ -@InterfaceAudience.Private -public class NamedQueuePayload { - - public enum NamedQueueEvent { - SLOW_LOG - } - - private final NamedQueueEvent namedQueueEvent; - - public NamedQueuePayload(NamedQueueEvent namedQueueEvent) { - if (namedQueueEvent == null) { - throw new RuntimeException("NamedQueuePayload with null namedQueueEvent"); - } - this.namedQueueEvent = namedQueueEvent; - } - - public NamedQueueEvent getNamedQueueEvent() { - return namedQueueEvent; - } - -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java deleted file mode 100644 index 84c1b24..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.namequeues; - -import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; -import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * In-memory Queue service provider for multiple use-cases. Implementers should be - * registered in LogEventHandler - */ -@InterfaceAudience.Private -public interface NamedQueueService { - - /** - * Retrieve event type for NamedQueueService implementation. - * - * @return {@link NamedQueuePayload.NamedQueueEvent} - */ - NamedQueuePayload.NamedQueueEvent getEvent(); - - /** - * This implementation is generic for consuming records from LMAX - * disruptor and inserts records to EvictingQueue which is maintained by each - * ringbuffer provider. - * - * @param namedQueuePayload namedQueue payload from disruptor ring buffer - */ - void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload); - - /** - * Cleans up queues maintained by services. - * - * @return true if slow log payloads are cleaned up, false otherwise - */ - boolean clearNamedQueue(); - - /** - * Retrieve in memory queue records from ringbuffer - * - * @param request namedQueue request with event type - * @return queue records from ringbuffer after filter (if applied) - */ - NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request); - - /** - * Add all in memory queue records to system table. The implementors can use system table - * or direct HDFS file or ZK as persistence system. - */ - void persistAll(); -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java deleted file mode 100644 index 2c701ff..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.namequeues; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; -import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor; -import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; -import org.apache.hbase.thirdparty.com.google.common.collect.Queues; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.locks.ReentrantLock; - -/** - * Persistent service provider for Slow/LargeLog events - */ -@InterfaceAudience.Private -public class SlowLogPersistentService { - - private static final Logger LOG = LoggerFactory.getLogger(SlowLogPersistentService.class); - - private static final ReentrantLock LOCK = new ReentrantLock(); - private static final String SYS_TABLE_QUEUE_SIZE = - "hbase.regionserver.slowlog.systable.queue.size"; - private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000; - private static final int SYSTABLE_PUT_BATCH_SIZE = 100; - - private final Queue<TooSlowLog.SlowLogPayload> queueForSysTable; - - private final Configuration configuration; - - public SlowLogPersistentService(final Configuration configuration) { - this.configuration = configuration; - int sysTableQueueSize = - configuration.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE); - EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueueForTable = - EvictingQueue.create(sysTableQueueSize); - queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable); - } - - public void addToQueueForSysTable(TooSlowLog.SlowLogPayload slowLogPayload) { - queueForSysTable.add(slowLogPayload); - } - - /** - * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch - */ - public void addAllLogsToSysTable() { - if (queueForSysTable == null) { - LOG.trace("hbase.regionserver.slowlog.systable.enabled is turned off. Exiting."); - return; - } - if (LOCK.isLocked()) { - return; - } - LOCK.lock(); - try { - List<TooSlowLog.SlowLogPayload> slowLogPayloads = new ArrayList<>(); - int i = 0; - while (!queueForSysTable.isEmpty()) { - slowLogPayloads.add(queueForSysTable.poll()); - i++; - if (i == SYSTABLE_PUT_BATCH_SIZE) { - SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); - slowLogPayloads.clear(); - i = 0; - } - } - if (slowLogPayloads.size() > 0) { - SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); - } - } finally { - LOCK.unlock(); - } - } - -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java deleted file mode 100644 index f26ff51..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.namequeues.impl; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.SlowLogParams; -import org.apache.hadoop.hbase.ipc.RpcCall; -import org.apache.hadoop.hbase.namequeues.LogHandlerUtils; -import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; -import org.apache.hadoop.hbase.namequeues.NamedQueueService; -import org.apache.hadoop.hbase.namequeues.RpcLogDetails; -import org.apache.hadoop.hbase.namequeues.SlowLogPersistentService; -import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; -import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; -import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; -import org.apache.hbase.thirdparty.com.google.common.collect.Queues; -import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; -import org.apache.hbase.thirdparty.com.google.protobuf.Message; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Queue; -import java.util.stream.Collectors; - -/** - * In-memory Queue service provider for Slow/LargeLog events - */ -@InterfaceAudience.Private -public class SlowLogQueueService implements NamedQueueService { - - private static final Logger LOG = LoggerFactory.getLogger(SlowLogQueueService.class); - - private static final String SLOW_LOG_RING_BUFFER_SIZE = - "hbase.regionserver.slowlog.ringbuffer.size"; - - private final boolean isOnlineLogProviderEnabled; - private final boolean isSlowLogTableEnabled; - private final SlowLogPersistentService slowLogPersistentService; - private final Queue<TooSlowLog.SlowLogPayload> slowLogQueue; - - public SlowLogQueueService(Configuration conf) { - this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, - HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); - - if (!isOnlineLogProviderEnabled) { - this.isSlowLogTableEnabled = false; - this.slowLogPersistentService = null; - this.slowLogQueue = null; - return; - } - - // Initialize SlowLog Queue - int slowLogQueueSize = - conf.getInt(SLOW_LOG_RING_BUFFER_SIZE, HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE); - - EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueue = - EvictingQueue.create(slowLogQueueSize); - slowLogQueue = Queues.synchronizedQueue(evictingQueue); - - this.isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, - HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); - if (isSlowLogTableEnabled) { - slowLogPersistentService = new SlowLogPersistentService(conf); - } else { - slowLogPersistentService = null; - } - } - - @Override - public NamedQueuePayload.NamedQueueEvent getEvent() { - return NamedQueuePayload.NamedQueueEvent.SLOW_LOG; - } - - /** - * This implementation is specific to slowLog event. This consumes slowLog event from - * disruptor and inserts records to EvictingQueue. - * - * @param namedQueuePayload namedQueue payload from disruptor ring buffer - */ - @Override - public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { - if (!isOnlineLogProviderEnabled) { - return; - } - if (!(namedQueuePayload instanceof RpcLogDetails)) { - LOG.warn("SlowLogQueueService: NamedQueuePayload is not of type RpcLogDetails."); - return; - } - final RpcLogDetails rpcLogDetails = (RpcLogDetails) namedQueuePayload; - final RpcCall rpcCall = rpcLogDetails.getRpcCall(); - final String clientAddress = rpcLogDetails.getClientAddress(); - final long responseSize = rpcLogDetails.getResponseSize(); - final String className = rpcLogDetails.getClassName(); - final TooSlowLog.SlowLogPayload.Type type = getLogType(rpcLogDetails); - if (type == null) { - return; - } - Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod(); - Message param = rpcLogDetails.getParam(); - long receiveTime = rpcCall.getReceiveTime(); - long startTime = rpcCall.getStartTime(); - long endTime = System.currentTimeMillis(); - int processingTime = (int) (endTime - startTime); - int qTime = (int) (startTime - receiveTime); - final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param); - int numGets = 0; - int numMutations = 0; - int numServiceCalls = 0; - if (param instanceof ClientProtos.MultiRequest) { - ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param; - for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) { - for (ClientProtos.Action action : regionAction.getActionList()) { - if (action.hasMutation()) { - numMutations++; - } - if (action.hasGet()) { - numGets++; - } - if (action.hasServiceCall()) { - numServiceCalls++; - } - } - } - } - final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY); - final String methodDescriptorName = - methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY; - TooSlowLog.SlowLogPayload slowLogPayload = TooSlowLog.SlowLogPayload.newBuilder() - .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")") - .setClientAddress(clientAddress) - .setMethodName(methodDescriptorName) - .setMultiGets(numGets) - .setMultiMutations(numMutations) - .setMultiServiceCalls(numServiceCalls) - .setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY) - .setProcessingTime(processingTime) - .setQueueTime(qTime) - .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY) - .setResponseSize(responseSize) - .setServerClass(className) - .setStartTime(startTime) - .setType(type) - .setUserName(userName) - .build(); - slowLogQueue.add(slowLogPayload); - if (isSlowLogTableEnabled) { - if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) { - slowLogPersistentService.addToQueueForSysTable(slowLogPayload); - } - } - } - - @Override - public boolean clearNamedQueue() { - if (!isOnlineLogProviderEnabled) { - return false; - } - LOG.debug("Received request to clean up online slowlog buffer."); - slowLogQueue.clear(); - return true; - } - - @Override - public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { - if (!isOnlineLogProviderEnabled) { - return null; - } - final AdminProtos.SlowLogResponseRequest slowLogResponseRequest = - request.getSlowLogResponseRequest(); - final List<TooSlowLog.SlowLogPayload> slowLogPayloads; - if (AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG - .equals(slowLogResponseRequest.getLogType())) { - slowLogPayloads = getLargeLogPayloads(slowLogResponseRequest); - } else { - slowLogPayloads = getSlowLogPayloads(slowLogResponseRequest); - } - NamedQueueGetResponse response = new NamedQueueGetResponse(); - response.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); - response.setSlowLogPayloads(slowLogPayloads); - return response; - } - - private TooSlowLog.SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) { - final boolean isSlowLog = rpcCallDetails.isSlowLog(); - final boolean isLargeLog = rpcCallDetails.isLargeLog(); - final TooSlowLog.SlowLogPayload.Type type; - if (!isSlowLog && !isLargeLog) { - LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}", - rpcCallDetails); - return null; - } - if (isSlowLog && isLargeLog) { - type = TooSlowLog.SlowLogPayload.Type.ALL; - } else if (isSlowLog) { - type = TooSlowLog.SlowLogPayload.Type.SLOW_LOG; - } else { - type = TooSlowLog.SlowLogPayload.Type.LARGE_LOG; - } - return type; - } - - /** - * Add all slowLog events to system table. This is only for slowLog event's persistence on - * system table. - */ - @Override - public void persistAll() { - if (!isOnlineLogProviderEnabled) { - return; - } - if (slowLogPersistentService != null) { - slowLogPersistentService.addAllLogsToSysTable(); - } - } - - private List<TooSlowLog.SlowLogPayload> getSlowLogPayloads( - final AdminProtos.SlowLogResponseRequest request) { - List<TooSlowLog.SlowLogPayload> slowLogPayloadList = - Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])).filter( - e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL - || e.getType() == TooSlowLog.SlowLogPayload.Type.SLOW_LOG).collect(Collectors.toList()); - // latest slow logs first, operator is interested in latest records from in-memory buffer - Collections.reverse(slowLogPayloadList); - return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList); - } - - private List<TooSlowLog.SlowLogPayload> getLargeLogPayloads( - final AdminProtos.SlowLogResponseRequest request) { - List<TooSlowLog.SlowLogPayload> slowLogPayloadList = - Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])).filter( - e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL - || e.getType() == TooSlowLog.SlowLogPayload.Type.LARGE_LOG).collect(Collectors.toList()); - // latest large logs first, operator is interested in latest records from in-memory buffer - Collections.reverse(slowLogPayloadList); - return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList); - } - -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java deleted file mode 100644 index 6e88bf4..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.namequeues.request; - -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * Request object to be used by ring buffer use-cases. Clients get records by sending - * this request object. - * For each ring buffer use-case, add request payload to this class, client should set - * namedQueueEvent based on use-case. - * Protobuf does not support inheritance, hence we need to work with - */ -@InterfaceAudience.Private -public class NamedQueueGetRequest { - - private AdminProtos.SlowLogResponseRequest slowLogResponseRequest; - private NamedQueuePayload.NamedQueueEvent namedQueueEvent; - - public AdminProtos.SlowLogResponseRequest getSlowLogResponseRequest() { - return slowLogResponseRequest; - } - - public void setSlowLogResponseRequest( - AdminProtos.SlowLogResponseRequest slowLogResponseRequest) { - this.slowLogResponseRequest = slowLogResponseRequest; - } - - public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() { - return namedQueueEvent; - } - - public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { - this.namedQueueEvent = namedQueueEvent; - } - - @Override - public String toString() { - return new ToStringBuilder(this) - .append("slowLogResponseRequest", slowLogResponseRequest) - .append("namedQueueEvent", namedQueueEvent) - .toString(); - } - -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java deleted file mode 100644 index ee4ed43..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.namequeues.response; - -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; -import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; -import org.apache.yetus.audience.InterfaceAudience; -import java.util.List; - -/** - * Response object to be sent by namedQueue service back to caller - */ -@InterfaceAudience.Private -public class NamedQueueGetResponse { - - private List<TooSlowLog.SlowLogPayload> slowLogPayloads; - private NamedQueuePayload.NamedQueueEvent namedQueueEvent; - - public List<TooSlowLog.SlowLogPayload> getSlowLogPayloads() { - return slowLogPayloads; - } - - public void setSlowLogPayloads(List<TooSlowLog.SlowLogPayload> slowLogPayloads) { - this.slowLogPayloads = slowLogPayloads; - } - - public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() { - return namedQueueEvent; - } - - public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { - this.namedQueueEvent = namedQueueEvent; - } - - @Override - public String toString() { - return new ToStringBuilder(this) - .append("slowLogPayloads", slowLogPayloads) - .append("namedQueueEvent", namedQueueEvent) - .toString(); - } - -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 2df4d83..99fddf6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -138,8 +138,8 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; -import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; -import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore; +import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder; +import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogTableOpsChore; import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; @@ -536,7 +536,7 @@ public class HRegionServer extends Thread implements /** * Provide online slow log responses from ringbuffer */ - private NamedQueueRecorder namedQueueRecorder = null; + private SlowLogRecorder slowLogRecorder; /** * True if this RegionServer is coming up in a cluster where there is no Master; @@ -595,12 +595,7 @@ public class HRegionServer extends Thread implements this.stopped = false; if (!(this instanceof HMaster)) { - final boolean isOnlineLogProviderEnabled = conf.getBoolean( - HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, - HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); - if (isOnlineLogProviderEnabled) { - this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf); - } + this.slowLogRecorder = new SlowLogRecorder(this.conf); } rpcServices = createRpcServices(); useThisHostnameInstead = getUseThisHostnameInstead(conf); @@ -1499,12 +1494,12 @@ public class HRegionServer extends Thread implements } /** - * get NamedQueue Provider to add different logs to ringbuffer + * get Online SlowLog Provider to add slow logs to ringbuffer * - * @return NamedQueueRecorder + * @return Online SlowLog Provider */ - public NamedQueueRecorder getNamedQueueRecorder() { - return this.namedQueueRecorder; + public SlowLogRecorder getSlowLogRecorder() { + return this.slowLogRecorder; } /* @@ -2096,7 +2091,7 @@ public class HRegionServer extends Thread implements if (isSlowLogTableEnabled) { // default chore duration: 10 min final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000); - slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder); + slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.slowLogRecorder); } // Create the thread to clean the moved regions list diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index d49bf4c..56de741 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -109,9 +109,6 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterRpcServices; -import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; -import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; -import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; @@ -132,7 +129,7 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; -import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; +import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker; import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker; @@ -1308,7 +1305,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name); rpcServer.setRsRpcServices(this); if (!(rs instanceof HMaster)) { - rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder()); + rpcServer.setSlowLogRecorder(rs.getSlowLogRecorder()); } scannerLeaseTimeoutPeriod = conf.getInt( HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, @@ -4074,39 +4071,28 @@ public class RSRpcServices implements HBaseRPCErrorHandler, @QosPriority(priority = HConstants.ADMIN_QOS) public SlowLogResponses getSlowLogResponses(final RpcController controller, final SlowLogResponseRequest request) { - final NamedQueueRecorder namedQueueRecorder = - this.regionServer.getNamedQueueRecorder(); - final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder); + final SlowLogRecorder slowLogRecorder = + this.regionServer.getSlowLogRecorder(); + final List<SlowLogPayload> slowLogPayloads; + slowLogPayloads = slowLogRecorder != null + ? slowLogRecorder.getSlowLogPayloads(request) + : Collections.emptyList(); SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder() .addAllSlowLogPayloads(slowLogPayloads) .build(); return slowLogResponses; } - private List<SlowLogPayload> getSlowLogPayloads(SlowLogResponseRequest request, - NamedQueueRecorder namedQueueRecorder) { - if (namedQueueRecorder == null) { - return Collections.emptyList(); - } - List<SlowLogPayload> slowLogPayloads; - NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); - namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); - namedQueueGetRequest.setSlowLogResponseRequest(request); - NamedQueueGetResponse namedQueueGetResponse = - namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); - slowLogPayloads = namedQueueGetResponse != null ? - namedQueueGetResponse.getSlowLogPayloads() : - Collections.emptyList(); - return slowLogPayloads; - } - @Override @QosPriority(priority = HConstants.ADMIN_QOS) public SlowLogResponses getLargeLogResponses(final RpcController controller, final SlowLogResponseRequest request) { - final NamedQueueRecorder namedQueueRecorder = - this.regionServer.getNamedQueueRecorder(); - final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder); + final SlowLogRecorder slowLogRecorder = + this.regionServer.getSlowLogRecorder(); + final List<SlowLogPayload> slowLogPayloads; + slowLogPayloads = slowLogRecorder != null + ? slowLogRecorder.getLargeLogPayloads(request) + : Collections.emptyList(); SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder() .addAllSlowLogPayloads(slowLogPayloads) .build(); @@ -4117,12 +4103,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, @QosPriority(priority = HConstants.ADMIN_QOS) public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller, final ClearSlowLogResponseRequest request) { - final NamedQueueRecorder namedQueueRecorder = - this.regionServer.getNamedQueueRecorder(); - boolean slowLogsCleaned = Optional.ofNullable(namedQueueRecorder) - .map(queueRecorder -> - queueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG)) - .orElse(false); + final SlowLogRecorder slowLogRecorder = + this.regionServer.getSlowLogRecorder(); + boolean slowLogsCleaned = Optional.ofNullable(slowLogRecorder) + .map(SlowLogRecorder::clearSlowLogPayloads).orElse(false); ClearSlowLogResponses clearSlowLogResponses = ClearSlowLogResponses.newBuilder() .setIsCleaned(slowLogsCleaned) .build(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/DisruptorExceptionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java similarity index 96% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/DisruptorExceptionHandler.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java index fcaecc6..53a2ef1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/DisruptorExceptionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java @@ -17,7 +17,7 @@ * limitations under the License. */ -package org.apache.hadoop.hbase.namequeues; +package org.apache.hadoop.hbase.regionserver.slowlog; import com.lmax.disruptor.ExceptionHandler; import org.apache.yetus.audience.InterfaceAudience; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java new file mode 100644 index 0000000..9c147e3 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java @@ -0,0 +1,266 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.slowlog; + +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.RingBuffer; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.SlowLogParams; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; +import org.apache.hbase.thirdparty.com.google.common.collect.Queues; +import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; + +/** + * Event Handler run by disruptor ringbuffer consumer + */ +@InterfaceAudience.Private +class LogEventHandler implements EventHandler<RingBufferEnvelope> { + + private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class); + + private static final String SYS_TABLE_QUEUE_SIZE = + "hbase.regionserver.slowlog.systable.queue.size"; + private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000; + private static final int SYSTABLE_PUT_BATCH_SIZE = 100; + + private final Queue<SlowLogPayload> queueForRingBuffer; + private final Queue<SlowLogPayload> queueForSysTable; + private final boolean isSlowLogTableEnabled; + + private Configuration configuration; + + private static final ReentrantLock LOCK = new ReentrantLock(); + + LogEventHandler(int eventCount, boolean isSlowLogTableEnabled, Configuration conf) { + this.configuration = conf; + EvictingQueue<SlowLogPayload> evictingQueue = EvictingQueue.create(eventCount); + queueForRingBuffer = Queues.synchronizedQueue(evictingQueue); + this.isSlowLogTableEnabled = isSlowLogTableEnabled; + if (isSlowLogTableEnabled) { + int sysTableQueueSize = conf.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE); + EvictingQueue<SlowLogPayload> evictingQueueForTable = + EvictingQueue.create(sysTableQueueSize); + queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable); + } else { + queueForSysTable = null; + } + } + + /** + * Called when a publisher has published an event to the {@link RingBuffer} + * + * @param event published to the {@link RingBuffer} + * @param sequence of the event being processed + * @param endOfBatch flag to indicate if this is the last event in a batch from + * the {@link RingBuffer} + * @throws Exception if the EventHandler would like the exception handled further up the chain + */ + @Override + public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch) + throws Exception { + final RpcLogDetails rpcCallDetails = event.getPayload(); + final RpcCall rpcCall = rpcCallDetails.getRpcCall(); + final String clientAddress = rpcCallDetails.getClientAddress(); + final long responseSize = rpcCallDetails.getResponseSize(); + final String className = rpcCallDetails.getClassName(); + final SlowLogPayload.Type type = getLogType(rpcCallDetails); + if (type == null) { + return; + } + Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod(); + Message param = rpcCallDetails.getParam(); + long receiveTime = rpcCall.getReceiveTime(); + long startTime = rpcCall.getStartTime(); + long endTime = System.currentTimeMillis(); + int processingTime = (int) (endTime - startTime); + int qTime = (int) (startTime - receiveTime); + final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param); + int numGets = 0; + int numMutations = 0; + int numServiceCalls = 0; + if (param instanceof ClientProtos.MultiRequest) { + ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param; + for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) { + for (ClientProtos.Action action : regionAction.getActionList()) { + if (action.hasMutation()) { + numMutations++; + } + if (action.hasGet()) { + numGets++; + } + if (action.hasServiceCall()) { + numServiceCalls++; + } + } + } + } + final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY); + final String methodDescriptorName = + methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY; + SlowLogPayload slowLogPayload = SlowLogPayload.newBuilder() + .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")") + .setClientAddress(clientAddress) + .setMethodName(methodDescriptorName) + .setMultiGets(numGets) + .setMultiMutations(numMutations) + .setMultiServiceCalls(numServiceCalls) + .setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY) + .setProcessingTime(processingTime) + .setQueueTime(qTime) + .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY) + .setResponseSize(responseSize) + .setServerClass(className) + .setStartTime(startTime) + .setType(type) + .setUserName(userName) + .build(); + queueForRingBuffer.add(slowLogPayload); + if (isSlowLogTableEnabled) { + if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) { + queueForSysTable.add(slowLogPayload); + } + } + } + + private SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) { + final boolean isSlowLog = rpcCallDetails.isSlowLog(); + final boolean isLargeLog = rpcCallDetails.isLargeLog(); + final SlowLogPayload.Type type; + if (!isSlowLog && !isLargeLog) { + LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}", + rpcCallDetails); + return null; + } + if (isSlowLog && isLargeLog) { + type = SlowLogPayload.Type.ALL; + } else if (isSlowLog) { + type = SlowLogPayload.Type.SLOW_LOG; + } else { + type = SlowLogPayload.Type.LARGE_LOG; + } + return type; + } + + /** + * Cleans up slow log payloads + * + * @return true if slow log payloads are cleaned up, false otherwise + */ + boolean clearSlowLogs() { + if (LOG.isDebugEnabled()) { + LOG.debug("Received request to clean up online slowlog buffer.."); + } + queueForRingBuffer.clear(); + return true; + } + + /** + * Retrieve list of slow log payloads + * + * @param request slow log request parameters + * @return list of slow log payloads + */ + List<SlowLogPayload> getSlowLogPayloads(final AdminProtos.SlowLogResponseRequest request) { + List<SlowLogPayload> slowLogPayloadList = + Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0])) + .filter(e -> e.getType() == SlowLogPayload.Type.ALL + || e.getType() == SlowLogPayload.Type.SLOW_LOG) + .collect(Collectors.toList()); + + // latest slow logs first, operator is interested in latest records from in-memory buffer + Collections.reverse(slowLogPayloadList); + + return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList); + } + + /** + * Retrieve list of large log payloads + * + * @param request large log request parameters + * @return list of large log payloads + */ + List<SlowLogPayload> getLargeLogPayloads(final AdminProtos.SlowLogResponseRequest request) { + List<SlowLogPayload> slowLogPayloadList = + Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0])) + .filter(e -> e.getType() == SlowLogPayload.Type.ALL + || e.getType() == SlowLogPayload.Type.LARGE_LOG) + .collect(Collectors.toList()); + + // latest large logs first, operator is interested in latest records from in-memory buffer + Collections.reverse(slowLogPayloadList); + + return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList); + } + + /** + * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch + */ + void addAllLogsToSysTable() { + if (queueForSysTable == null) { + // hbase.regionserver.slowlog.systable.enabled is turned off. Exiting. + return; + } + if (LOCK.isLocked()) { + return; + } + LOCK.lock(); + try { + List<SlowLogPayload> slowLogPayloads = new ArrayList<>(); + int i = 0; + while (!queueForSysTable.isEmpty()) { + slowLogPayloads.add(queueForSysTable.poll()); + i++; + if (i == SYSTABLE_PUT_BATCH_SIZE) { + SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); + slowLogPayloads.clear(); + i = 0; + } + } + if (slowLogPayloads.size() > 0) { + SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); + } + } finally { + LOCK.unlock(); + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogHandlerUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogHandlerUtils.java similarity index 96% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogHandlerUtils.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogHandlerUtils.java index f04cb18..f4d850f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogHandlerUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogHandlerUtils.java @@ -17,7 +17,7 @@ * limitations under the License. */ -package org.apache.hadoop.hbase.namequeues; +package org.apache.hadoop.hbase.regionserver.slowlog; import java.util.ArrayList; import java.util.List; @@ -30,7 +30,7 @@ import org.apache.yetus.audience.InterfaceAudience; * Event Handler utility class */ @InterfaceAudience.Private -public class LogHandlerUtils { +class LogHandlerUtils { private static int getTotalFiltersCount(AdminProtos.SlowLogResponseRequest request) { int totalFilters = 0; @@ -91,7 +91,7 @@ public class LogHandlerUtils { return filteredSlowLogPayloads; } - public static List<TooSlowLog.SlowLogPayload> getFilteredLogs( + static List<TooSlowLog.SlowLogPayload> getFilteredLogs( AdminProtos.SlowLogResponseRequest request, List<TooSlowLog.SlowLogPayload> logPayloadList) { int totalFilters = getTotalFiltersCount(request); if (totalFilters > 0) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java similarity index 68% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java index f93baaa..d308670 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java @@ -17,29 +17,29 @@ * limitations under the License. */ -package org.apache.hadoop.hbase.namequeues; +package org.apache.hadoop.hbase.regionserver.slowlog; import org.apache.hadoop.hbase.ipc.RpcCall; import org.apache.yetus.audience.InterfaceAudience; /** - * An envelope to carry payload in the ring buffer that serves as online buffer - * to provide latest events + * An envelope to carry payload in the slow log ring buffer that serves as online buffer + * to provide latest TooSlowLog */ @InterfaceAudience.Private final class RingBufferEnvelope { - private NamedQueuePayload namedQueuePayload; + private RpcLogDetails rpcLogDetails; /** * Load the Envelope with {@link RpcCall} * - * @param namedQueuePayload all details of rpc call that would be useful for ring buffer + * @param rpcLogDetails all details of rpc call that would be useful for ring buffer * consumers */ - public void load(NamedQueuePayload namedQueuePayload) { - this.namedQueuePayload = namedQueuePayload; + public void load(RpcLogDetails rpcLogDetails) { + this.rpcLogDetails = rpcLogDetails; } /** @@ -48,10 +48,10 @@ final class RingBufferEnvelope { * * @return Retrieve rpc log details */ - public NamedQueuePayload getPayload() { - final NamedQueuePayload namedQueuePayload = this.namedQueuePayload; - this.namedQueuePayload = null; - return namedQueuePayload; + public RpcLogDetails getPayload() { + final RpcLogDetails rpcLogDetails = this.rpcLogDetails; + this.rpcLogDetails = null; + return rpcLogDetails; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java similarity index 94% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java index 581d1a3..b469cdb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java @@ -17,7 +17,7 @@ * limitations under the License. */ -package org.apache.hadoop.hbase.namequeues; +package org.apache.hadoop.hbase.regionserver.slowlog; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.hadoop.hbase.ipc.RpcCall; @@ -28,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience; * RpcCall details that would be passed on to ring buffer of slow log responses */ @InterfaceAudience.Private -public class RpcLogDetails extends NamedQueuePayload { +public class RpcLogDetails { private final RpcCall rpcCall; private final Message param; @@ -40,7 +40,6 @@ public class RpcLogDetails extends NamedQueuePayload { public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long responseSize, String className, boolean isSlowLog, boolean isLargeLog) { - super(NamedQueueEvent.SLOW_LOG); this.rpcCall = rpcCall; this.param = param; this.clientAddress = clientAddress; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java similarity index 50% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java index cb3512a..b0fb3e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java @@ -17,80 +17,87 @@ * limitations under the License. */ -package org.apache.hadoop.hbase.namequeues; +package org.apache.hadoop.hbase.regionserver.slowlog; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; +import java.util.Collections; +import java.util.List; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; -import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.util.Threads; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; + /** - * NamedQueue recorder that maintains various named queues. - * The service uses LMAX Disruptor to save queue records which are then consumed by + * Online Slow/Large Log Provider Service that keeps slow/large RPC logs in the ring buffer. + * The service uses LMAX Disruptor to save slow records which are then consumed by * a queue and based on the ring buffer size, the available records are then fetched * from the queue in thread-safe manner. */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class NamedQueueRecorder { +public class SlowLogRecorder { private final Disruptor<RingBufferEnvelope> disruptor; private final LogEventHandler logEventHandler; + private final int eventCount; + private final boolean isOnlineLogProviderEnabled; - private static NamedQueueRecorder namedQueueRecorder; - private static boolean isInit = false; - private static final Object LOCK = new Object(); + private static final String SLOW_LOG_RING_BUFFER_SIZE = + "hbase.regionserver.slowlog.ringbuffer.size"; /** * Initialize disruptor with configurable ringbuffer size */ - private NamedQueueRecorder(Configuration conf) { + public SlowLogRecorder(Configuration conf) { + isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, + HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); + + if (!isOnlineLogProviderEnabled) { + this.disruptor = null; + this.logEventHandler = null; + this.eventCount = 0; + return; + } + + this.eventCount = conf.getInt(SLOW_LOG_RING_BUFFER_SIZE, + HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE); // This is the 'writer' -- a single threaded executor. This single thread consumes what is // put on the ringbuffer. final String hostingThreadName = Thread.currentThread().getName(); - int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024); - // disruptor initialization with BlockingWaitStrategy this.disruptor = new Disruptor<>(RingBufferEnvelope::new, - getEventCount(eventCount), + getEventCount(), Threads.newDaemonThreadFactory(hostingThreadName + ".slowlog.append"), ProducerType.MULTI, new BlockingWaitStrategy()); this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler()); // initialize ringbuffer event handler - this.logEventHandler = new LogEventHandler(conf); + final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, + HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); + this.logEventHandler = new LogEventHandler(this.eventCount, isSlowLogTableEnabled, conf); this.disruptor.handleEventsWith(new LogEventHandler[]{this.logEventHandler}); this.disruptor.start(); } - public static NamedQueueRecorder getInstance(Configuration conf) { - if (namedQueueRecorder != null) { - return namedQueueRecorder; - } - synchronized (LOCK) { - if (!isInit) { - namedQueueRecorder = new NamedQueueRecorder(conf); - isInit = true; - } - } - return namedQueueRecorder; - } - // must be power of 2 for disruptor ringbuffer - private int getEventCount(int eventCount) { - Preconditions.checkArgument(eventCount >= 0, "hbase.namedqueue.ringbuffer.size must be > 0"); + private int getEventCount() { + Preconditions.checkArgument(eventCount >= 0, + SLOW_LOG_RING_BUFFER_SIZE + " must be > 0"); int floor = Integer.highestOneBit(eventCount); if (floor == eventCount) { return floor; @@ -103,53 +110,66 @@ public class NamedQueueRecorder { } /** - * Retrieve in memory queue records from ringbuffer + * Retrieve online slow logs from ringbuffer + * + * @param request slow log request parameters + * @return online slow logs from ringbuffer + */ + public List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) { + return isOnlineLogProviderEnabled ? this.logEventHandler.getSlowLogPayloads(request) + : Collections.emptyList(); + } + + /** + * Retrieve online large logs from ringbuffer * - * @param request namedQueue request with event type - * @return queue records from ringbuffer after filter (if applied) + * @param request large log request parameters + * @return online large logs from ringbuffer */ - public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { - return this.logEventHandler.getNamedQueueRecords(request); + public List<SlowLogPayload> getLargeLogPayloads(AdminProtos.SlowLogResponseRequest request) { + return isOnlineLogProviderEnabled ? this.logEventHandler.getLargeLogPayloads(request) + : Collections.emptyList(); } /** - * clears queue records from ringbuffer + * clears slow log payloads from ringbuffer * - * @param namedQueueEvent type of queue to clear * @return true if slow log payloads are cleaned up or * hbase.regionserver.slowlog.buffer.enabled is not set to true, false if failed to * clean up slow logs */ - public boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { - return this.logEventHandler.clearNamedQueue(namedQueueEvent); + public boolean clearSlowLogPayloads() { + if (!isOnlineLogProviderEnabled) { + return true; + } + return this.logEventHandler.clearSlowLogs(); } /** - * Add various NamedQueue records to ringbuffer. Based on the type of the event (e.g slowLog), - * consumer of disruptor ringbuffer will have specific logic. - * This method is producer of disruptor ringbuffer which is initialized in NamedQueueRecorder - * constructor. + * Add slow log rpcCall details to ringbuffer * - * @param namedQueuePayload namedQueue payload sent by client of ring buffer - * service + * @param rpcLogDetails all details of rpc call that would be useful for ring buffer + * consumers */ - public void addRecord(NamedQueuePayload namedQueuePayload) { + public void addSlowLogPayload(RpcLogDetails rpcLogDetails) { + if (!isOnlineLogProviderEnabled) { + return; + } RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer(); long seqId = ringBuffer.next(); try { - ringBuffer.get(seqId).load(namedQueuePayload); + ringBuffer.get(seqId).load(rpcLogDetails); } finally { ringBuffer.publish(seqId); } } /** - * Add all in memory queue records to system table. The implementors can use system table - * or direct HDFS file or ZK as persistence system. + * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch */ - public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { + public void addAllLogsToSysTable() { if (this.logEventHandler != null) { - this.logEventHandler.persistAll(namedQueueEvent); + this.logEventHandler.addAllLogsToSysTable(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java similarity index 84% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java index bc892e3..77749f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java @@ -17,7 +17,7 @@ * limitations under the License. */ -package org.apache.hadoop.hbase.namequeues; +package org.apache.hadoop.hbase.regionserver.slowlog; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; @@ -33,7 +33,7 @@ public class SlowLogTableOpsChore extends ScheduledChore { private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableOpsChore.class); - private final NamedQueueRecorder namedQueueRecorder; + private final SlowLogRecorder slowLogRecorder; /** * Chore Constructor @@ -41,12 +41,12 @@ public class SlowLogTableOpsChore extends ScheduledChore { * @param stopper The stopper - When {@link Stoppable#isStopped()} is true, this chore will * cancel and cleanup * @param period Period in millis with which this Chore repeats execution when scheduled - * @param namedQueueRecorder {@link NamedQueueRecorder} instance + * @param slowLogRecorder {@link SlowLogRecorder} instance */ public SlowLogTableOpsChore(final Stoppable stopper, final int period, - final NamedQueueRecorder namedQueueRecorder) { + final SlowLogRecorder slowLogRecorder) { super("SlowLogTableOpsChore", stopper, period); - this.namedQueueRecorder = namedQueueRecorder; + this.slowLogRecorder = slowLogRecorder; } @Override @@ -54,7 +54,7 @@ public class SlowLogTableOpsChore extends ScheduledChore { if (LOG.isTraceEnabled()) { LOG.trace("SlowLog Table Ops Chore is starting up."); } - namedQueueRecorder.persistAll(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); + slowLogRecorder.addAllLogsToSysTable(); if (LOG.isTraceEnabled()) { LOG.trace("SlowLog Table Ops Chore is closing."); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java similarity index 76% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java index a914796..e08ad29 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java @@ -17,11 +17,10 @@ * limitations under the License. */ -package org.apache.hadoop.hbase.namequeues; +package org.apache.hadoop.hbase.regionserver.slowlog; import java.io.IOException; import java.lang.reflect.Field; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; @@ -34,11 +33,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; -import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -63,11 +59,11 @@ public class TestSlowLogAccessor { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestSlowLogAccessor.class); - private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class); + private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class); private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility(); - private NamedQueueRecorder namedQueueRecorder; + private SlowLogRecorder slowLogRecorder; @BeforeClass public static void setup() throws Exception { @@ -92,19 +88,9 @@ public class TestSlowLogAccessor { @Before public void setUp() throws Exception { HRegionServer hRegionServer = HBASE_TESTING_UTILITY.getMiniHBaseCluster().getRegionServer(0); - Field slowLogRecorder = HRegionServer.class.getDeclaredField("namedQueueRecorder"); + Field slowLogRecorder = HRegionServer.class.getDeclaredField("slowLogRecorder"); slowLogRecorder.setAccessible(true); - this.namedQueueRecorder = (NamedQueueRecorder) slowLogRecorder.get(hRegionServer); - } - - private List<TooSlowLog.SlowLogPayload> getSlowLogPayloads( - AdminProtos.SlowLogResponseRequest request) { - NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); - namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); - namedQueueGetRequest.setSlowLogResponseRequest(request); - NamedQueueGetResponse namedQueueGetResponse = - namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); - return namedQueueGetResponse.getSlowLogPayloads(); + this.slowLogRecorder = (SlowLogRecorder) slowLogRecorder.get(hRegionServer); } @Test @@ -113,42 +99,42 @@ public class TestSlowLogAccessor { AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); - namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); - Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + slowLogRecorder.clearSlowLogPayloads(); + Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); int i = 0; Connection connection = waitForSlowLogTableCreation(); // add 5 records initially for (; i < 5; i++) { - RpcLogDetails rpcLogDetails = TestNamedQueueRecorder + RpcLogDetails rpcLogDetails = TestSlowLogRecorder .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - namedQueueRecorder.addRecord(rpcLogDetails); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); } // add 2 more records for (; i < 7; i++) { - RpcLogDetails rpcLogDetails = TestNamedQueueRecorder + RpcLogDetails rpcLogDetails = TestSlowLogRecorder .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - namedQueueRecorder.addRecord(rpcLogDetails); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); } // add 3 more records for (; i < 10; i++) { - RpcLogDetails rpcLogDetails = TestNamedQueueRecorder + RpcLogDetails rpcLogDetails = TestSlowLogRecorder .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - namedQueueRecorder.addRecord(rpcLogDetails); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); } // add 4 more records for (; i < 14; i++) { - RpcLogDetails rpcLogDetails = TestNamedQueueRecorder + RpcLogDetails rpcLogDetails = TestSlowLogRecorder .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - namedQueueRecorder.addRecord(rpcLogDetails); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); } Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY - .waitFor(3000, () -> getSlowLogPayloads(request).size() == 14)); + .waitFor(3000, () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14)); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> getTableCount(connection) == 14)); @@ -184,10 +170,10 @@ public class TestSlowLogAccessor { public void testHigherSlowLogs() throws Exception { Connection connection = waitForSlowLogTableCreation(); - namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); + slowLogRecorder.clearSlowLogPayloads(); AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build(); - Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); for (int j = 0; j < 100; j++) { CompletableFuture.runAsync(() -> { @@ -195,15 +181,15 @@ public class TestSlowLogAccessor { if (i == 300) { Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); } - RpcLogDetails rpcLogDetails = TestNamedQueueRecorder + RpcLogDetails rpcLogDetails = TestSlowLogRecorder .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - namedQueueRecorder.addRecord(rpcLogDetails); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); } }); } Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(7000, () -> { - int count = getSlowLogPayloads(request).size(); + int count = slowLogRecorder.getSlowLogPayloads(request).size(); LOG.debug("RingBuffer records count: {}", count); return count > 2000; })); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java similarity index 76% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java index 542efc3..f90bbc0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java @@ -17,14 +17,12 @@ * limitations under the License. */ -package org.apache.hadoop.hbase.namequeues; +package org.apache.hadoop.hbase.regionserver.slowlog; import java.io.IOException; -import java.lang.reflect.Constructor; import java.net.InetAddress; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -37,8 +35,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ipc.RpcCall; import org.apache.hadoop.hbase.ipc.RpcCallback; -import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; -import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -65,25 +61,27 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPaylo * Tests for Online SlowLog Provider Service */ @Category({MasterTests.class, MediumTests.class}) -public class TestNamedQueueRecorder { +public class TestSlowLogRecorder { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestNamedQueueRecorder.class); + HBaseClassTestRule.forClass(TestSlowLogRecorder.class); - private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class); + private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class); private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility(); - private NamedQueueRecorder namedQueueRecorder; + private SlowLogRecorder slowLogRecorder; private static int i = 0; private static Configuration applySlowLogRecorderConf(int eventSize) { + Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize); return conf; + } /** @@ -92,10 +90,11 @@ public class TestNamedQueueRecorder { * * @param i index of ringbuffer logs * @param j data value that was put on index i - * @param slowLogPayloads list of payload retrieved from {@link NamedQueueRecorder} + * @param slowLogPayloads list of payload retrieved from {@link SlowLogRecorder} * @return if actual values are as per expectations */ private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) { + boolean isClientExpected = slowLogPayloads.get(i).getClientAddress().equals("client_" + j); boolean isUserExpected = slowLogPayloads.get(i).getUserName().equals("userName_" + j); boolean isClassExpected = slowLogPayloads.get(i).getServerClass().equals("class_" + j); @@ -103,18 +102,15 @@ public class TestNamedQueueRecorder { } @Test - public void testOnlieSlowLogConsumption() throws Exception{ + public void testOnlieSlowLogConsumption() throws Exception { Configuration conf = applySlowLogRecorderConf(8); - Constructor<NamedQueueRecorder> constructor = - NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); - constructor.setAccessible(true); - namedQueueRecorder = constructor.newInstance(conf); + slowLogRecorder = new SlowLogRecorder(conf); AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); - namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); - Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + slowLogRecorder.clearSlowLogPayloads(); + Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); LOG.debug("Initially ringbuffer of Slow Log records is empty"); int i = 0; @@ -123,12 +119,12 @@ public class TestNamedQueueRecorder { for (; i < 5; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - namedQueueRecorder.addRecord(rpcLogDetails); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); } - Assert.assertNotEquals(-1, - HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 5)); - List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, + () -> slowLogRecorder.getSlowLogPayloads(request).size() == 5)); + List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); Assert.assertTrue(confirmPayloadParams(0, 5, slowLogPayloads)); Assert.assertTrue(confirmPayloadParams(1, 4, slowLogPayloads)); Assert.assertTrue(confirmPayloadParams(2, 3, slowLogPayloads)); @@ -139,15 +135,15 @@ public class TestNamedQueueRecorder { for (; i < 7; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - namedQueueRecorder.addRecord(rpcLogDetails); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); } Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> getSlowLogPayloads(request).size() == 7)); + () -> slowLogRecorder.getSlowLogPayloads(request).size() == 7)); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); + List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request); return slowLogPayloadsList.size() == 7 && confirmPayloadParams(0, 7, slowLogPayloadsList) && confirmPayloadParams(5, 2, slowLogPayloadsList) @@ -159,15 +155,15 @@ public class TestNamedQueueRecorder { for (; i < 10; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - namedQueueRecorder.addRecord(rpcLogDetails); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); } Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> getSlowLogPayloads(request).size() == 8)); + () -> slowLogRecorder.getSlowLogPayloads(request).size() == 8)); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); + List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request); // confirm ringbuffer is full return slowLogPayloadsList.size() == 8 && confirmPayloadParams(7, 3, slowLogPayloadsList) @@ -180,15 +176,15 @@ public class TestNamedQueueRecorder { for (; i < 14; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - namedQueueRecorder.addRecord(rpcLogDetails); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); } Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> getSlowLogPayloads(request).size() == 8)); + () -> slowLogRecorder.getSlowLogPayloads(request).size() == 8)); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); + List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request); // confirm ringbuffer is full // and ordered events return slowLogPayloadsList.size() == 8 @@ -199,14 +195,9 @@ public class TestNamedQueueRecorder { }) ); - AdminProtos.SlowLogResponseRequest largeLogRequest = - AdminProtos.SlowLogResponseRequest.newBuilder() - .setLimit(15) - .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG) - .build(); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(largeLogRequest); + List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getLargeLogPayloads(request); // confirm ringbuffer is full // and ordered events return slowLogPayloadsList.size() == 8 @@ -219,12 +210,11 @@ public class TestNamedQueueRecorder { Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue( - NamedQueuePayload.NamedQueueEvent.SLOW_LOG); + boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads(); LOG.debug("cleared the ringbuffer of Online Slow Log records"); - List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); + List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request); // confirm ringbuffer is empty return slowLogPayloadsList.size() == 0 && isRingBufferCleaned; }) @@ -232,43 +222,30 @@ public class TestNamedQueueRecorder { } - private List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) { - NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); - namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); - namedQueueGetRequest.setSlowLogResponseRequest(request); - NamedQueueGetResponse namedQueueGetResponse = - namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); - return namedQueueGetResponse == null ? - Collections.emptyList() : namedQueueGetResponse.getSlowLogPayloads(); - } - @Test public void testOnlineSlowLogWithHighRecords() throws Exception { Configuration conf = applySlowLogRecorderConf(14); - Constructor<NamedQueueRecorder> constructor = - NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); - constructor.setAccessible(true); - namedQueueRecorder = constructor.newInstance(conf); + slowLogRecorder = new SlowLogRecorder(conf); AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build(); - Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); LOG.debug("Initially ringbuffer of Slow Log records is empty"); for (int i = 0; i < 14 * 11; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - namedQueueRecorder.addRecord(rpcLogDetails); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); } LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records"); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> getSlowLogPayloads(request).size() == 14)); + () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14)); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); + List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); // confirm strict order of slow log payloads return slowLogPayloads.size() == 14 @@ -289,37 +266,37 @@ public class TestNamedQueueRecorder { }) ); - boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue( - NamedQueuePayload.NamedQueueEvent.SLOW_LOG); + boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads(); Assert.assertTrue(isRingBufferCleaned); LOG.debug("cleared the ringbuffer of Online Slow Log records"); - List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); + List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); // confirm ringbuffer is empty Assert.assertEquals(slowLogPayloads.size(), 0); + } @Test public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception { + Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY); - Constructor<NamedQueueRecorder> constructor = - NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); - constructor.setAccessible(true); - namedQueueRecorder = constructor.newInstance(conf); + slowLogRecorder = new SlowLogRecorder(conf); AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder().build(); - Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); LOG.debug("Initially ringbuffer of Slow Log records is empty"); + for (int i = 0; i < 300; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - namedQueueRecorder.addRecord(rpcLogDetails); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); } + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); + List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); return slowLogPayloads.size() == 0; }) ); @@ -328,58 +305,56 @@ public class TestNamedQueueRecorder { @Test public void testOnlineSlowLogWithDisableConfig() throws Exception { + Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false); - Constructor<NamedQueueRecorder> constructor = - NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); - constructor.setAccessible(true); - namedQueueRecorder = constructor.newInstance(conf); + slowLogRecorder = new SlowLogRecorder(conf); AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder().build(); - Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); LOG.debug("Initially ringbuffer of Slow Log records is empty"); + for (int i = 0; i < 300; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - namedQueueRecorder.addRecord(rpcLogDetails); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); } + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); + List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); return slowLogPayloads.size() == 0; }) ); conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); + } @Test public void testSlowLogFilters() throws Exception { Configuration conf = applySlowLogRecorderConf(30); - Constructor<NamedQueueRecorder> constructor = - NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); - constructor.setAccessible(true); - namedQueueRecorder = constructor.newInstance(conf); + slowLogRecorder = new SlowLogRecorder(conf); AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder() .setLimit(15) .setUserName("userName_87") .build(); - Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); LOG.debug("Initially ringbuffer of Slow Log records is empty"); for (int i = 0; i < 100; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - namedQueueRecorder.addRecord(rpcLogDetails); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); } LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter"); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> getSlowLogPayloads(request).size() == 1)); + () -> slowLogRecorder.getSlowLogPayloads(request).size() == 1)); AdminProtos.SlowLogResponseRequest requestClient = AdminProtos.SlowLogResponseRequest.newBuilder() @@ -387,32 +362,25 @@ public class TestNamedQueueRecorder { .setClientAddress("client_85") .build(); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> getSlowLogPayloads(requestClient).size() == 1)); + () -> slowLogRecorder.getSlowLogPayloads(requestClient).size() == 1)); AdminProtos.SlowLogResponseRequest requestSlowLog = AdminProtos.SlowLogResponseRequest.newBuilder() .setLimit(15) .build(); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> getSlowLogPayloads(requestSlowLog).size() == 15)); + () -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15)); + } @Test public void testConcurrentSlowLogEvents() throws Exception { Configuration conf = applySlowLogRecorderConf(50000); - Constructor<NamedQueueRecorder> constructor = - NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); - constructor.setAccessible(true); - namedQueueRecorder = constructor.newInstance(conf); + slowLogRecorder = new SlowLogRecorder(conf); AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build(); - AdminProtos.SlowLogResponseRequest largeLogRequest = - AdminProtos.SlowLogResponseRequest.newBuilder() - .setLimit(500000) - .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG) - .build(); - Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); LOG.debug("Initially ringbuffer of Slow Log records is empty"); for (int j = 0; j < 1000; j++) { @@ -421,7 +389,7 @@ public class TestNamedQueueRecorder { for (int i = 0; i < 3500; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - namedQueueRecorder.addRecord(rpcLogDetails); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); } }); @@ -429,24 +397,22 @@ public class TestNamedQueueRecorder { Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + slowLogRecorder.clearSlowLogPayloads(); + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor( - 5000, () -> getSlowLogPayloads(request).size() > 10000)); + 5000, () -> slowLogRecorder.getSlowLogPayloads(request).size() > 10000)); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor( - 5000, () -> getSlowLogPayloads(largeLogRequest).size() > 10000)); + 5000, () -> slowLogRecorder.getLargeLogPayloads(request).size() > 10000)); } @Test public void testSlowLargeLogEvents() throws Exception { Configuration conf = applySlowLogRecorderConf(28); - Constructor<NamedQueueRecorder> constructor = - NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); - constructor.setAccessible(true); - namedQueueRecorder = constructor.newInstance(conf); - + slowLogRecorder = new SlowLogRecorder(conf); AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build(); - Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); LOG.debug("Initially ringbuffer of Slow Log records is empty"); boolean isSlowLog; @@ -462,16 +428,16 @@ public class TestNamedQueueRecorder { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1), isSlowLog, isLargeLog); - namedQueueRecorder.addRecord(rpcLogDetails); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); } LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records"); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> getSlowLogPayloads(request).size() == 14)); + () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14)); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); + List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); // confirm strict order of slow log payloads return slowLogPayloads.size() == 14 @@ -492,18 +458,12 @@ public class TestNamedQueueRecorder { }) ); - AdminProtos.SlowLogResponseRequest largeLogRequest = - AdminProtos.SlowLogResponseRequest.newBuilder() - .setLimit(14 * 11) - .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG) - .build(); - Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> getSlowLogPayloads(largeLogRequest).size() == 14)); + () -> slowLogRecorder.getLargeLogPayloads(request).size() == 14)); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { - List<SlowLogPayload> largeLogPayloads = getSlowLogPayloads(largeLogRequest); + List<SlowLogPayload> largeLogPayloads = slowLogRecorder.getLargeLogPayloads(request); // confirm strict order of slow log payloads return largeLogPayloads.size() == 14 @@ -523,16 +483,14 @@ public class TestNamedQueueRecorder { && confirmPayloadParams(13, 128, largeLogPayloads); }) ); + } @Test public void testSlowLogMixedFilters() throws Exception { Configuration conf = applySlowLogRecorderConf(30); - Constructor<NamedQueueRecorder> constructor = - NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); - constructor.setAccessible(true); - namedQueueRecorder = constructor.newInstance(conf); + slowLogRecorder = new SlowLogRecorder(conf); AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder() .setLimit(15) @@ -540,23 +498,23 @@ public class TestNamedQueueRecorder { .setClientAddress("client_88") .build(); - Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); for (int i = 0; i < 100; i++) { RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); - namedQueueRecorder.addRecord(rpcLogDetails); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); } Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> getSlowLogPayloads(request).size() == 2)); + () -> slowLogRecorder.getSlowLogPayloads(request).size() == 2)); AdminProtos.SlowLogResponseRequest request2 = AdminProtos.SlowLogResponseRequest.newBuilder() .setLimit(15) .setUserName("userName_1") .setClientAddress("client_2") .build(); - Assert.assertEquals(0, getSlowLogPayloads(request2).size()); + Assert.assertEquals(0, slowLogRecorder.getSlowLogPayloads(request2).size()); AdminProtos.SlowLogResponseRequest request3 = AdminProtos.SlowLogResponseRequest.newBuilder() @@ -565,7 +523,7 @@ public class TestNamedQueueRecorder { .setClientAddress("client_88") .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND) .build(); - Assert.assertEquals(0, getSlowLogPayloads(request3).size()); + Assert.assertEquals(0, slowLogRecorder.getSlowLogPayloads(request3).size()); AdminProtos.SlowLogResponseRequest request4 = AdminProtos.SlowLogResponseRequest.newBuilder() @@ -574,7 +532,7 @@ public class TestNamedQueueRecorder { .setClientAddress("client_87") .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND) .build(); - Assert.assertEquals(1, getSlowLogPayloads(request4).size()); + Assert.assertEquals(1, slowLogRecorder.getSlowLogPayloads(request4).size()); AdminProtos.SlowLogResponseRequest request5 = AdminProtos.SlowLogResponseRequest.newBuilder() @@ -583,14 +541,14 @@ public class TestNamedQueueRecorder { .setClientAddress("client_89") .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR) .build(); - Assert.assertEquals(2, getSlowLogPayloads(request5).size()); + Assert.assertEquals(2, slowLogRecorder.getSlowLogPayloads(request5).size()); AdminProtos.SlowLogResponseRequest requestSlowLog = AdminProtos.SlowLogResponseRequest.newBuilder() .setLimit(15) .build(); Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, - () -> getSlowLogPayloads(requestSlowLog).size() == 15)); + () -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15)); } static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) { @@ -804,20 +762,26 @@ public class TestNamedQueueRecorder { private static Optional<User> getUser(String userName) { return Optional.of(new User() { + + @Override public String getShortName() { return userName; } + @Override public <T> T runAs(PrivilegedAction<T> action) { return null; } + @Override - public <T> T runAs(PrivilegedExceptionAction<T> action) { + public <T> T runAs(PrivilegedExceptionAction<T> action) throws + IOException, InterruptedException { return null; } + }); }