This is an automated email from the ASF dual-hosted git repository. dhavalshah9131 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ranger.git
The following commit(s) were added to refs/heads/master by this push: new f0ea284c5 RANGER-4196-Tomcat metrics collection code f0ea284c5 is described below commit f0ea284c5628ce30322245cbf07642cf5493b716 Author: Vikas Kumar <talktovikas...@gmail.com> AuthorDate: Thu Apr 20 19:59:38 2023 +0530 RANGER-4196-Tomcat metrics collection code --- .../ranger/server/tomcat/EmbeddedServer.java | 13 +++ .../tomcat/EmbeddedServerMetricsCollector.java | 128 +++++++++++++++++++++ ranger-metrics/pom.xml | 19 +++ .../ranger/metrics/RangerMetricsSystemWrapper.java | 2 + .../source/RangerMetricsContainerSource.java | 99 ++++++++++++++++ .../source/TestRangerMetricsContainerSource.java | 111 ++++++++++++++++++ 6 files changed, 372 insertions(+) diff --git a/embeddedwebserver/src/main/java/org/apache/ranger/server/tomcat/EmbeddedServer.java b/embeddedwebserver/src/main/java/org/apache/ranger/server/tomcat/EmbeddedServer.java index cae9075a7..a0d616925 100644 --- a/embeddedwebserver/src/main/java/org/apache/ranger/server/tomcat/EmbeddedServer.java +++ b/embeddedwebserver/src/main/java/org/apache/ranger/server/tomcat/EmbeddedServer.java @@ -87,6 +87,8 @@ public class EmbeddedServer { public static final String RANGER_SSL_KEYMANAGER_ALGO_TYPE = KeyManagerFactory.getDefaultAlgorithm(); public static final String RANGER_SSL_TRUSTMANAGER_ALGO_TYPE = TrustManagerFactory.getDefaultAlgorithm(); + private static EmbeddedServerMetricsCollector serverMetricsCollector; + public static void main(String[] args) { new EmbeddedServer(args).start(); } @@ -342,6 +344,8 @@ public class EmbeddedServer { } } } + + serverMetricsCollector = new EmbeddedServerMetricsCollector(server); server.start(); server.getServer().await(); shutdownServer(); @@ -573,4 +577,13 @@ public class EmbeddedServer { } } + public static EmbeddedServerMetricsCollector getServerMetricsCollector(){ + + EmbeddedServerMetricsCollector embeddedServerMetricsCollector = EmbeddedServer.serverMetricsCollector; + if( null != embeddedServerMetricsCollector ){ + LOG.info("Selected Tomcat protocolHandler: "+ embeddedServerMetricsCollector.getProtocolHandlerName()); + } + return embeddedServerMetricsCollector; + } + } diff --git a/embeddedwebserver/src/main/java/org/apache/ranger/server/tomcat/EmbeddedServerMetricsCollector.java b/embeddedwebserver/src/main/java/org/apache/ranger/server/tomcat/EmbeddedServerMetricsCollector.java new file mode 100644 index 000000000..90617f456 --- /dev/null +++ b/embeddedwebserver/src/main/java/org/apache/ranger/server/tomcat/EmbeddedServerMetricsCollector.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.server.tomcat; + +import org.apache.catalina.connector.Connector; +import org.apache.catalina.startup.Tomcat; +import org.apache.coyote.AbstractProtocol; + +import java.util.concurrent.Executor; +import org.apache.tomcat.util.threads.ThreadPoolExecutor; + +public class EmbeddedServerMetricsCollector { + + private final Connector connector; + private final AbstractProtocol protocolHandler; + + EmbeddedServerMetricsCollector( Tomcat server){ + this.connector = server.getConnector(); + this.protocolHandler = (AbstractProtocol) this.connector.getProtocolHandler(); + } + + /** + * + * @return: maxConfigured (allowed) connections to be accepted by the server. + */ + public long getMaxAllowedConnection(){ + + return this.protocolHandler.getMaxConnections(); + } + + /** + * + * @return: Once maxConnection is reached, OS would still accept few more connections in a queue and size of queue is determined by "acceptCount" + * By default, it is 100. + * Note: These connections will wait in the queue for serverSocket to accept. + */ + public int getConnectionAcceptCount(){ + return this.protocolHandler.getAcceptCount(); + } + + /** + * + * @return: Returns the active connections count. + */ + public long getActiveConnectionCount(){ + return this.protocolHandler.getConnectionCount(); + } + + /** + * + * @return: Max container threads count + */ + public int getMaxContainerThreadsCount(){ + return this.protocolHandler.getMaxThreads(); + } + + /** + * + * @return: Returns the corePoolSize of threadpool + */ + public int getMinSpareContainerThreadsCount(){ + return this.protocolHandler.getMinSpareThreads(); + } + + /** + * + * @return: Returns the current active worked threads count. + * Note: {@link ThreadPoolExecutor#getActiveCount()} internally acquires lock, so it could be expensive. + */ + public int getActiveContainerThreadsCount(){ + Executor executor = this.protocolHandler.getExecutor(); + + int activeThreadCount = -1; + + if( executor instanceof ThreadPoolExecutor){ + + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; + activeThreadCount = threadPoolExecutor.getActiveCount(); + } + + return activeThreadCount; + } + + public int getTotalContainerThreadsCount(){ + Executor executor = this.protocolHandler.getExecutor(); + + int totalThreadCount = -1; + + if( executor instanceof ThreadPoolExecutor){ + + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; + totalThreadCount = threadPoolExecutor.getPoolSize(); + } + + return totalThreadCount; + } + + + public String getProtocolHandlerName(){ + return this.protocolHandler.getName(); + } + public long getConnectionTimeout(){ + + return this.protocolHandler.getConnectionTimeout(); + } + + public long getKeepAliveTimeout(){ + return this.protocolHandler.getKeepAliveTimeout(); + } + +} diff --git a/ranger-metrics/pom.xml b/ranger-metrics/pom.xml index 44602c3b8..1b0f860dc 100644 --- a/ranger-metrics/pom.xml +++ b/ranger-metrics/pom.xml @@ -45,11 +45,30 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.ranger</groupId> + <artifactId>embeddedwebserver</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + </exclusion> + </exclusions> + </dependency> <!-- Test --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + </dependency> </dependencies> </project> diff --git a/ranger-metrics/src/main/java/org/apache/ranger/metrics/RangerMetricsSystemWrapper.java b/ranger-metrics/src/main/java/org/apache/ranger/metrics/RangerMetricsSystemWrapper.java index cd806574d..ba40b1161 100644 --- a/ranger-metrics/src/main/java/org/apache/ranger/metrics/RangerMetricsSystemWrapper.java +++ b/ranger-metrics/src/main/java/org/apache/ranger/metrics/RangerMetricsSystemWrapper.java @@ -32,6 +32,7 @@ import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.ranger.metrics.sink.RangerMetricsJsonSink; import org.apache.ranger.metrics.sink.RangerMetricsPrometheusSink; +import org.apache.ranger.metrics.source.RangerMetricsContainerSource; import org.apache.ranger.metrics.source.RangerMetricsJvmSource; import org.apache.ranger.metrics.wrapper.RangerMetricsSinkWrapper; import org.apache.ranger.metrics.wrapper.RangerMetricsSourceWrapper; @@ -61,6 +62,7 @@ public class RangerMetricsSystemWrapper { sourceWrappers = new ArrayList<RangerMetricsSourceWrapper>(); } sourceWrappers.add(new RangerMetricsSourceWrapper("RangerJVM", "Ranger common metric source (RangerMetricsJvmSource)", serviceName, new RangerMetricsJvmSource(serviceName))); + sourceWrappers.add(new RangerMetricsSourceWrapper("RangerContainer", "Ranger web container metric source (RangerMetricsContainerSource)", serviceName, new RangerMetricsContainerSource(serviceName))); for (RangerMetricsSourceWrapper sourceWrapper: sourceWrappers) { metricsSystem.register(sourceWrapper.getName(), sourceWrapper.getDescription(), sourceWrapper.getSource()); diff --git a/ranger-metrics/src/main/java/org/apache/ranger/metrics/source/RangerMetricsContainerSource.java b/ranger-metrics/src/main/java/org/apache/ranger/metrics/source/RangerMetricsContainerSource.java new file mode 100644 index 000000000..d380d9e66 --- /dev/null +++ b/ranger-metrics/src/main/java/org/apache/ranger/metrics/source/RangerMetricsContainerSource.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.metrics.source; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.ranger.metrics.RangerMetricsInfo; +import org.apache.ranger.server.tomcat.EmbeddedServer; +import org.apache.ranger.server.tomcat.EmbeddedServerMetricsCollector; +import org.apache.hadoop.metrics2.MetricsCollector; + +import java.util.Objects; + +public class RangerMetricsContainerSource extends RangerMetricsSource { + + private EmbeddedServerMetricsCollector embeddedServerMetricsCollector; + + private long maxConnections; + + private int acceptCount; + + private long activeConnectionsCount; + + private int maxContainersThreadCount; + + private int minSpareThreadsCount; + + private int activeContainerThreadsCount; + + private int totalContainerThreadsCount; + + private long connectionTimeout; + + private long keepAliveTimeout; + + private final String context; + + public RangerMetricsContainerSource(String context) { + this.context = context; + this.embeddedServerMetricsCollector = EmbeddedServer.getServerMetricsCollector(); + } + + @Override + protected void refresh() { + + if(Objects.nonNull(this.embeddedServerMetricsCollector)) + { + this.maxConnections = embeddedServerMetricsCollector.getMaxAllowedConnection(); + this.acceptCount = embeddedServerMetricsCollector.getConnectionAcceptCount(); + this.activeConnectionsCount = embeddedServerMetricsCollector.getActiveConnectionCount(); + this.maxContainersThreadCount = embeddedServerMetricsCollector.getMaxContainerThreadsCount(); + this.minSpareThreadsCount = embeddedServerMetricsCollector.getMinSpareContainerThreadsCount(); + this.activeContainerThreadsCount = embeddedServerMetricsCollector.getActiveContainerThreadsCount(); + this.connectionTimeout = embeddedServerMetricsCollector.getConnectionTimeout(); + this.keepAliveTimeout = embeddedServerMetricsCollector.getKeepAliveTimeout(); + this.totalContainerThreadsCount = embeddedServerMetricsCollector.getTotalContainerThreadsCount(); + } + + + } + + @Override + protected void update(MetricsCollector collector, boolean all) { + + collector.addRecord("RangerWebContainer") + .setContext(this.context) + .addCounter(new RangerMetricsInfo("MaxConnectionsCount", "Ranger max configured container connections"), this.maxConnections) + .addCounter(new RangerMetricsInfo("ActiveConnectionsCount", "Ranger active container connections"), this.activeConnectionsCount) + .addCounter(new RangerMetricsInfo("ConnectionAcceptCount", "Ranger accept connections count"), this.acceptCount) + .addCounter(new RangerMetricsInfo("ConnectionTimeout", "Ranger connection timeout"), this.connectionTimeout) + .addCounter(new RangerMetricsInfo("KeepAliveTimeout", "Ranger connection keepAlive timeout"), this.keepAliveTimeout) + .addCounter(new RangerMetricsInfo("MaxWorkerThreadsCount", "Ranger container worker threads count"), this.maxContainersThreadCount) + .addCounter(new RangerMetricsInfo("MinSpareWorkerThreadsCount", "Ranger container minimum spare worker threads count"), this.minSpareThreadsCount) + .addCounter(new RangerMetricsInfo("ActiveWorkerThreadsCount", "Ranger container active worker threads count"), this.activeContainerThreadsCount) + .addCounter(new RangerMetricsInfo("TotalWorkerThreadsCount", "Ranger container total worker threads count"), this.totalContainerThreadsCount); + } + + @VisibleForTesting + void setEmbeddedServerMetricsCollector( EmbeddedServerMetricsCollector embeddedServerMetricsCollector ){ + this.embeddedServerMetricsCollector = embeddedServerMetricsCollector; + } + +} diff --git a/ranger-metrics/src/test/java/org/apache/ranger/metrics/source/TestRangerMetricsContainerSource.java b/ranger-metrics/src/test/java/org/apache/ranger/metrics/source/TestRangerMetricsContainerSource.java new file mode 100644 index 000000000..15462cba5 --- /dev/null +++ b/ranger-metrics/src/test/java/org/apache/ranger/metrics/source/TestRangerMetricsContainerSource.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ranger.metrics.source; + +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.ranger.metrics.RangerMetricsSystemWrapper; +import org.apache.ranger.server.tomcat.EmbeddedServer; +import org.apache.ranger.server.tomcat.EmbeddedServerMetricsCollector; +import org.junit.*; + +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestRangerMetricsContainerSource { + + private static final String CONTAINER_METRIC_SOURCE_NAME = "RangerContainer"; + private static RangerMetricsSystemWrapper rangerMetricsSystemWrapper; + private static EmbeddedServer tomcatServer; + + private EmbeddedServerMetricsCollector embeddedServerMetricsCollector; + + private static MetricsSystem metricsSystem; + + public TestRangerMetricsContainerSource(){ + } + + @BeforeClass + public static void init(){ + + metricsSystem = DefaultMetricsSystem.instance(); + TestRangerMetricsContainerSource.rangerMetricsSystemWrapper = new RangerMetricsSystemWrapper(); + TestRangerMetricsContainerSource.rangerMetricsSystemWrapper.init("test", null, (List)null); + } + + @AfterClass + public static void tearDown() + { + metricsSystem.shutdown(); + } + + // Without proper start of EmbeddedServer, embeddedServerMetricsCollector will be returned null. + // That's why, mocked instance should be injected here. + @Before + public void before(){ + embeddedServerMetricsCollector = mock(EmbeddedServerMetricsCollector.class); + ((RangerMetricsContainerSource)DefaultMetricsSystem.instance().getSource(CONTAINER_METRIC_SOURCE_NAME)).setEmbeddedServerMetricsCollector(embeddedServerMetricsCollector); + } + + // Resetting it back to original state. + @After + public void after(){ + ((RangerMetricsContainerSource)DefaultMetricsSystem.instance().getSource(CONTAINER_METRIC_SOURCE_NAME)).setEmbeddedServerMetricsCollector(null); + } + + + /* + * Test Case: + * This case verifies the tomcat metric collection when RangerMetricsContainerSource gets executed to collect the metrics. + * Mocking: Mocked the EmbeddedServerMetricsCollector as it gets initialised when Tomcat server starts. + * Simulated what to return when these APIs get called. + * Expected output: After metric collection through metric system, on fetching the json metrics it should return the stats. + * Note: DefaultMetricSystem is singleton and is being used by the RangerMetricsContainerSource + */ + + @Test + public void testContainerMetricsCollection(){ + + when(embeddedServerMetricsCollector.getActiveConnectionCount()).thenReturn(1L); + when(embeddedServerMetricsCollector.getMaxAllowedConnection()).thenReturn(8192L); + when(embeddedServerMetricsCollector.getConnectionAcceptCount()).thenReturn(100); + when(embeddedServerMetricsCollector.getMaxContainerThreadsCount()).thenReturn(200); + when(embeddedServerMetricsCollector.getMinSpareContainerThreadsCount()).thenReturn(10); + when(embeddedServerMetricsCollector.getActiveContainerThreadsCount()).thenReturn(2); + when(embeddedServerMetricsCollector.getConnectionTimeout()).thenReturn(60000L); + when(embeddedServerMetricsCollector.getKeepAliveTimeout()).thenReturn(60000L); + when(embeddedServerMetricsCollector.getTotalContainerThreadsCount()).thenReturn(15); + + metricsSystem.publishMetricsNow(); + + Assert.assertEquals(1L, rangerMetricsSystemWrapper.getRangerMetrics().get("RangerWebContainer").get("ActiveConnectionsCount")); + Assert.assertEquals(60000L, rangerMetricsSystemWrapper.getRangerMetrics().get("RangerWebContainer").get("ConnectionTimeout")); + Assert.assertEquals(200, rangerMetricsSystemWrapper.getRangerMetrics().get("RangerWebContainer").get("MaxWorkerThreadsCount")); + Assert.assertEquals(15, rangerMetricsSystemWrapper.getRangerMetrics().get("RangerWebContainer").get("TotalWorkerThreadsCount")); + Assert.assertEquals(60000L, rangerMetricsSystemWrapper.getRangerMetrics().get("RangerWebContainer").get("KeepAliveTimeout")); + Assert.assertEquals(2, rangerMetricsSystemWrapper.getRangerMetrics().get("RangerWebContainer").get("ActiveWorkerThreadsCount")); + Assert.assertEquals(100, rangerMetricsSystemWrapper.getRangerMetrics().get("RangerWebContainer").get("ConnectionAcceptCount")); + Assert.assertEquals(10, rangerMetricsSystemWrapper.getRangerMetrics().get("RangerWebContainer").get("MinSpareWorkerThreadsCount")); + Assert.assertEquals(8192L, rangerMetricsSystemWrapper.getRangerMetrics().get("RangerWebContainer").get("MaxConnectionsCount")); + + } + +}