This is an automated email from the ASF dual-hosted git repository. nju_yaho pushed a commit to tag ebay-3.1.0-release-20200701 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 2b3b6df124dc93c7efd4dbdd32a92d87c2b8dcc1 Author: Wang gang <gwa...@ebay.com> AuthorDate: Fri Jun 19 10:31:14 2020 +0800 EBAY-KYLIN-1919 Add prometheus metrics reporter --- metrics-reporter-prometheus/pom.xml | 79 ++++ .../impl/prometheus/PrometheusEventProducer.java | 489 +++++++++++++++++++++ .../prometheus/PrometheusReservoirReporter.java | 140 ++++++ .../prometheus/PrometheusEventProducerTest.java | 207 +++++++++ .../PrometheusReservoirReporterTest.java | 86 ++++ pom.xml | 9 + server/src/main/resources/kylinMetrics.xml | 12 +- 7 files changed, 1020 insertions(+), 2 deletions(-) diff --git a/metrics-reporter-prometheus/pom.xml b/metrics-reporter-prometheus/pom.xml new file mode 100644 index 0000000..f1ac16c --- /dev/null +++ b/metrics-reporter-prometheus/pom.xml @@ -0,0 +1,79 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>kylin-metrics-reporter-prometheus</artifactId> + <packaging>jar</packaging> + <name>Apache Kylin - Metrics Reporter Prometheus</name> + <description>Apache Kylin - Metrics Reporter Prometheus</description> + + <parent> + <artifactId>kylin</artifactId> + <groupId>org.apache.kylin</groupId> + <version>3.1.0</version> + </parent> + + <dependencies> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-core-metadata</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-storage-hbase</artifactId> + </dependency> + <dependency> + <groupId>io.prometheus</groupId> + <artifactId>simpleclient_servlet</artifactId> + <version>${prometheus.version}</version> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </dependency> + + <!-- Env & Test --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4-rule-agent</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file diff --git a/metrics-reporter-prometheus/src/main/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusEventProducer.java b/metrics-reporter-prometheus/src/main/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusEventProducer.java new file mode 100644 index 0000000..8784c18 --- /dev/null +++ b/metrics-reporter-prometheus/src/main/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusEventProducer.java @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.metrics.lib.impl.prometheus; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.exceptions.ResourceLimitExceededException; +import org.apache.kylin.metrics.lib.Record; +import org.apache.kylin.metrics.property.JobPropertyEnum; +import org.apache.kylin.metrics.property.QueryPropertyEnum; +import org.apache.kylin.metrics.property.QueryRPCPropertyEnum; +import org.apache.kylin.stream.core.util.NamedThreadFactory; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.prometheus.client.Counter; +import io.prometheus.client.Gauge; +import io.prometheus.client.exporter.MetricsServlet; + +public class PrometheusEventProducer { + private static final Logger logger = LoggerFactory.getLogger(PrometheusEventProducer.class); + private Server server; + + private Gauge queryLatencyGauge; + private Gauge queryRPCLatencyGauge; //Only for RPC with exceptions + private Counter queryCounter; + private Gauge queryAvailabilityGauge; + private Gauge slowQueryPercentage; + private Counter badQueryCounter; + private Counter slowQueryCounter; + private Counter normalQueryCounter; + private Counter queryHitCacheCounter; + private Gauge queryHitCachePercentage; + private Counter scanOutOfLimitExCounter; + + //end point rpc exception counter + private Counter epRPCExceptionCounter; + + private Counter errorJobCounter; + private Counter jobCounter; + private Gauge errorJobPercentage; + private Gauge jobDurationGauge; + private Gauge buildDictDurationGauge; + private Gauge distinctColDurationGauge; + private Gauge convertHFileDurationGauge; + private Gauge inmemCubingDurationGauge; + private Gauge waitResDurationGauge; + + private Gauge totalThreadsGauge; + private Gauge usedTomcatThreadsGauge; + private Gauge usedQueryThreadsGauge; + private Gauge usedCoprocessorThreadsGauge; + private Gauge hbaseIPCThreadsGauge; + private Gauge sharedHConnectionThreadsGauge; + + private ScheduledExecutorService appMetricsCollectorExecutor; + + public PrometheusEventProducer(Properties props) throws Exception { + String host = props.getProperty("agent.host"); + if (host == null || host == "") { + logger.warn("agent host not configured"); + host = InetAddress.getLocalHost().getHostName(); + } + String port = props.getProperty("agent.port", "1997"); + start(host, Integer.valueOf(port)); + initMetrics(); + appMetricsCollectorExecutor = Executors + .newSingleThreadScheduledExecutor(new NamedThreadFactory("metrics-collector")); + appMetricsCollectorExecutor.scheduleAtFixedRate(new AppMetricsCollector(), 60, 60, TimeUnit.SECONDS); + } + + private void initMetrics() { + queryLatencyGauge = Gauge.build().name("query_latency").help("query latency") + .labelNames("project_name", "cube_name").register(); + + queryRPCLatencyGauge = Gauge.build().name("query_rpc_latency").help("query rpc latency") + .labelNames("project_name", "cube_name").register(); + + queryCounter = Counter.build().name("query_count").help("query count").labelNames("project_name", "cube_name") + .register(); + + scanOutOfLimitExCounter = Counter.build().name("scanoutoflimit_exception_count") + .help("scan out of limit exception count").labelNames("project_name", "cube_name").register(); + + queryAvailabilityGauge = Gauge.build().name("query_availability").help( + "query availability: (count of queries with HBase rpc exception, exclude ScanOutOfLimitException)/(query count)") + .labelNames("project_name", "cube_name").register(); + + badQueryCounter = Counter.build().name("bad_query_count").help("query with any exception count") + .labelNames("project_name", "cube_name").register(); + + slowQueryCounter = Counter.build().name("slow_query").help("query with long latency count") + .labelNames("project_name", "cube_name").register(); + + slowQueryPercentage = Gauge.build().name("slow_query_percentage") + .help("slow query percentage: (slow query count)/(slow query and normal query count)") + .labelNames("project_name", "cube_name").register(); + + normalQueryCounter = Counter.build().name("normal_query_count").help("query with normal latency count") + .labelNames("project_name", "cube_name").register(); + + queryHitCacheCounter = Counter.build().name("query_hit_cache_count").help("query hit cache count") + .labelNames("project_name", "cube_name").register(); + + queryHitCachePercentage = Gauge.build().name("query_hit_cache_percentage").help("query hit cache percentage") + .labelNames("project_name", "cube_name").register(); + + epRPCExceptionCounter = Counter.build().name("endpoint_rpc_exception_count") + .help("query with HBase coprocessor endpoint exception count, exclude ScanOutOfLimitException") + .labelNames("project_name", "cube_name").register(); + + errorJobCounter = Counter.build().name("error_job_count").help("error job count") + .labelNames("project_name", "cube_name", "job_type").register(); + + jobCounter = Counter.build().name("job_count").help("job count") + .labelNames("project_name", "cube_name", "job_type").register(); + + errorJobPercentage = Gauge.build().name("error_job_percentage").help("error job percentage") + .labelNames("project_name", "cube_name", "job_type").register(); + + jobDurationGauge = Gauge.build().name("cube_build_duration").help("build cube job duration") + .labelNames("project_name", "cube_name", "job_type").register(); + + buildDictDurationGauge = Gauge.build().name("build_dictionary_duration").help("build dictionary step duration") + .labelNames("project_name", "cube_name").register(); + + distinctColDurationGauge = Gauge.build().name("distinct_columns_duration") + .help("get distinct columns step duration").labelNames("project_name", "cube_name").register(); + + convertHFileDurationGauge = Gauge.build().name("convert_hfile_duration").help("convert HFile step duration") + .labelNames("project_name", "cube_name").register(); + + inmemCubingDurationGauge = Gauge.build().name("inmemory_cubing_duration").help("in-memory cubing step duration") + .labelNames("project_name", "cube_name").register(); + + waitResDurationGauge = Gauge.build().name("wait_resource_duration").help("job wait resource duration") + .labelNames("project_name", "cube_name", "job_type").register(); + + totalThreadsGauge = Gauge.build().name("total_thread_cnt").help("total threads count used by Kylin").register(); + + usedTomcatThreadsGauge = Gauge.build().name("used_tomcat_thread_cnt") + .help("currently used tomcat processing threads count").register(); + + usedQueryThreadsGauge = Gauge.build().name("used_query_thread_cnt").help("currently used query threads count") + .register(); + + usedCoprocessorThreadsGauge = Gauge.build().name("used_coprocessor_thread_cnt") + .help("currently used coprocessor threads count").register(); + + hbaseIPCThreadsGauge = Gauge.build().name("ipc_client_thread_cnt").help("current ipc threads count").register(); + + sharedHConnectionThreadsGauge = Gauge.build().name("shared_hconn_thread_cnt") + .help("current shared hconnection threads count").register(); + } + + private void start(String host, int port) throws Exception { + InetSocketAddress socket = new InetSocketAddress(host, port); + logger.debug("prometheus agent: " + host + ":" + port); + server = new Server(socket); + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + server.setHandler(context); + context.addServlet(new ServletHolder(new MetricsServlet()), "/metrics"); + server.start(); + logger.info("prometheus agent started!"); + } + + public void add(Collection<Record> records) { + for (Record record : records) { + add(record); + } + } + + public void add(Record record) { + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + + String type = record.getSubject(); + if (type.equalsIgnoreCase(kylinConfig.getKylinMetricsSubjectQueryRpcCall())) { + addQueryRPCMetrics(record); + } else if (type.equalsIgnoreCase(kylinConfig.getKylinMetricsSubjectQuery())) { + addQueryMetrics(record); + } else if (type.equalsIgnoreCase(kylinConfig.getKylinMetricsSubjectJobException())) { + addJobExceptionMetrics(record); + } else if (type.equalsIgnoreCase(kylinConfig.getKylinMetricsSubjectJob())) { + addJobMetrics(record); + } else if (type.equalsIgnoreCase(kylinConfig.getKylinMetricsSubjectQueryCube())) { + //metric METRICS_QUERY already cover metric of query on cube, do nothing here + } else { + logger.warn("unknown metrics record type."); + } + } + + private void addQueryRPCMetrics(Record record) { + Map<String, Object> kvs = record.getValueRaw(); + String prjName = String.valueOf(kvs.get(QueryRPCPropertyEnum.PROJECT.toString())); + String cubeName = String.valueOf(kvs.get(QueryRPCPropertyEnum.REALIZATION.toString())); + + long callTime = Long.valueOf(String.valueOf(kvs.get(QueryRPCPropertyEnum.CALL_TIME.toString()))); + Object exception = kvs.get(QueryRPCPropertyEnum.EXCEPTION.toString()); + if (exception.toString() != "NULL") { + queryRPCLatencyGauge.labels(prjName, cubeName).set(callTime); + if (!exception.toString().equalsIgnoreCase(ResourceLimitExceededException.class.getName())) { + logger.debug("endpoint rpc exception: " + exception.toString()); + epRPCExceptionCounter.labels(prjName, cubeName).inc(); + } + } + } + + private void addQueryMetrics(Record record) { + Map<String, Object> kvs = record.getValueRaw(); + String prjName = String.valueOf(kvs.get(QueryPropertyEnum.PROJECT.toString())); + String cubeName = String.valueOf(kvs.get(QueryPropertyEnum.REALIZATION.toString())); + String type = String.valueOf(kvs.get(QueryPropertyEnum.TYPE.toString())); + + long queryLatency = Long.valueOf(String.valueOf(kvs.get(QueryPropertyEnum.TIME_COST.toString()))); + queryLatencyGauge.labels(prjName, cubeName).set(queryLatency); + + Object exception = kvs.get(QueryPropertyEnum.EXCEPTION.toString()); + //both InternalErrorException and other exceptions mark as exception + if (exception.toString() != "NULL") { + badQueryCounter.labels(prjName, cubeName).inc(); + if (exception.toString().equalsIgnoreCase(ResourceLimitExceededException.class.getName())) { + scanOutOfLimitExCounter.labels(prjName, cubeName).inc(); + } + } else { + if (queryLatency > Long.valueOf(KylinConfig.getInstanceFromEnv().getBadQueryDefaultAlertingSeconds())) { + slowQueryCounter.labels(prjName, cubeName).inc(); + } else { + normalQueryCounter.labels(prjName, cubeName).inc(); + } + } + + queryCounter.labels(prjName, cubeName).inc(); + + double epRPCExceptionCount = epRPCExceptionCounter.labels(prjName, cubeName).get(); + double queryCount = queryCounter.labels(prjName, cubeName).get(); + queryAvailabilityGauge.labels(prjName, cubeName).set(1 - epRPCExceptionCount / queryCount); + + double slowQueryCount = slowQueryCounter.labels(prjName, cubeName).get(); + double normalQueryCount = normalQueryCounter.labels(prjName, cubeName).get(); + slowQueryPercentage.labels(prjName, cubeName).set(slowQueryCount / (slowQueryCount + normalQueryCount)); + + if (type != null && type.equals("CACHE")) { + queryHitCacheCounter.labels(prjName, cubeName).inc(); + double queryHitCacheCount = queryHitCacheCounter.labels(prjName, cubeName).get(); + double queryCount2 = queryCounter.labels(prjName, cubeName).get(); + queryHitCachePercentage.labels(prjName, cubeName).set(queryHitCacheCount / queryCount2); + } + } + + //for success job, add duration metrics + private void addJobMetrics(Record record) { + Map<String, Object> kvs = record.getValueRaw(); + String prjName = String.valueOf(kvs.get(JobPropertyEnum.PROJECT.toString())); + String cubeName = String.valueOf(kvs.get(JobPropertyEnum.CUBE.toString())); + String jobType = String.valueOf(kvs.get(JobPropertyEnum.TYPE.toString())); + + long jobDuration = Long.valueOf(String.valueOf(kvs.get(JobPropertyEnum.BUILD_DURATION.toString()))); + long waitResourceDuration = Long + .valueOf(String.valueOf(kvs.get(JobPropertyEnum.WAIT_RESOURCE_TIME.toString()))); + long buildDictDuration = Long + .valueOf(String.valueOf(kvs.get(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString()))); + long distinctColDuration = Long + .valueOf(String.valueOf(kvs.get(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS.toString()))); + long convertHFileDuration = Long + .valueOf(String.valueOf(kvs.get(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString()))); + long inmemCubingDuration = Long + .valueOf(String.valueOf(kvs.get(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString()))); + + jobDurationGauge.labels(prjName, cubeName, jobType).set(jobDuration); + buildDictDurationGauge.labels(prjName, cubeName).set(buildDictDuration); + distinctColDurationGauge.labels(prjName, cubeName).set(distinctColDuration); + convertHFileDurationGauge.labels(prjName, cubeName).set(convertHFileDuration); + inmemCubingDurationGauge.labels(prjName, cubeName).set(inmemCubingDuration); + + waitResDurationGauge.labels(prjName, cubeName, jobType).set(waitResourceDuration); + jobCounter.labels(prjName, cubeName, jobType).inc(); + + double errorJobCount = errorJobCounter.labels(prjName, cubeName, jobType).get(); + double jobCount = jobCounter.labels(prjName, cubeName, jobType).get(); + errorJobPercentage.labels(prjName, cubeName, jobType).set(errorJobCount / jobCount); + } + + private void addJobExceptionMetrics(Record record) { + Map<String, Object> kvs = record.getValueRaw(); + String prjName = String.valueOf(kvs.get(JobPropertyEnum.PROJECT.toString())); + String cubeName = String.valueOf(kvs.get(JobPropertyEnum.CUBE.toString())); + String jobType = String.valueOf(kvs.get(JobPropertyEnum.TYPE.toString())); + + Object exception = kvs.get(JobPropertyEnum.EXCEPTION.toString()); + if (exception != null) { + errorJobCounter.labels(prjName, cubeName, jobType).inc(); + } + jobCounter.labels(prjName, cubeName, jobType).inc(); + } + + public void close() throws Exception { + server.stop(); + logger.info("Prometheus agent closed!"); + } + + public long getQueryLatencyGauge(String prjName, String cubeName) { + return (long) queryLatencyGauge.labels(prjName, cubeName).get(); + } + + public long getQueryRPCLatencyGauge(String prjName, String cubeName) { + return (long) queryRPCLatencyGauge.labels(prjName, cubeName).get(); + } + + public long getQueryCounter(String prjName, String cubeName) { + return (long) queryCounter.labels(prjName, cubeName).get(); + } + + public long getQueryAvailabilityGauge(String prjName, String cubeName) { + return (long) queryAvailabilityGauge.labels(prjName, cubeName).get(); + } + + public long getSlowQueryPercentage(String prjName, String cubeName) { + return (long) slowQueryPercentage.labels(prjName, cubeName).get(); + } + + public long getBadQueryCounter(String prjName, String cubeName) { + return (long) badQueryCounter.labels(prjName, cubeName).get(); + } + + public long getSlowQueryCounter(String prjName, String cubeName) { + return (long) slowQueryCounter.labels(prjName, cubeName).get(); + } + + public long getNormalQueryCounter(String prjName, String cubeName) { + return (long) normalQueryCounter.labels(prjName, cubeName).get(); + } + + public long getQueryHitCacheCounter(String prjName, String cubeName) { + return (long) queryHitCacheCounter.labels(prjName, cubeName).get(); + } + + public long getQueryHitCachePercentage(String prjName, String cubeName) { + return (long) queryHitCachePercentage.labels(prjName, cubeName).get(); + } + + public long getScanOutOfLimitExCounter(String prjName, String cubeName) { + return (long) scanOutOfLimitExCounter.labels(prjName, cubeName).get(); + } + + public long getEpRPCExceptionCounter(String prjName, String cubeName) { + return (long) epRPCExceptionCounter.labels(prjName, cubeName).get(); + } + + public long getJobCounter(String prjName, String cubeName, String jobType) { + return (long) jobCounter.labels(prjName, cubeName, jobType).get(); + } + + public long getErrorJobPercentage(String prjName, String cubeName, String jobType) { + return (long) errorJobPercentage.labels(prjName, cubeName, jobType).get(); + } + + public long getErrorJobCounter(String prjName, String cubeName, String jobType) { + return (long) errorJobCounter.labels(prjName, cubeName, jobType).get(); + } + + public long getJobDurationGauge(String prjName, String cubeName, String jobType) { + return (long) jobDurationGauge.labels(prjName, cubeName, jobType).get(); + } + + public long getBuildDictDurationGauge(String prjName, String cubeName) { + return (long) buildDictDurationGauge.labels(prjName, cubeName).get(); + } + + public long getDistinctColDurationGauge(String prjName, String cubeName) { + return (long) distinctColDurationGauge.labels(prjName, cubeName).get(); + } + + public long getConvertHFileDurationGauge(String prjName, String cubeName) { + return (long) convertHFileDurationGauge.labels(prjName, cubeName).get(); + } + + public long getInmemCubingDurationGauge(String prjName, String cubeName) { + return (long) inmemCubingDurationGauge.labels(prjName, cubeName).get(); + } + + public long getWaitResDurationGauge(String prjName, String cubeName, String jobType) { + return (long) waitResDurationGauge.labels(prjName, cubeName, jobType).get(); + } + + private class AppMetricsCollector implements Runnable { + + @Override + public void run() { + try { + // TODO collect tomcat information from tomcat JMX? + ThreadInfo[] allThreads = ManagementFactory.getThreadMXBean().dumpAllThreads(false, false); + int usedTomcatThreadCnt = 0; + int usedQueryThreadCnt = 0; + int usedCoprocessorThreadCnt = 0; + int totalThreadCnt = 0; + int ipcThreadCnt = 0; + int sharedHConnThreadCnt = 0; + for (ThreadInfo threadInfo : allThreads) { + totalThreadCnt++; + String threadName = threadInfo.getThreadName(); + if (threadName == null) { + continue; + } + if (isQueryThread(threadName)) { + usedQueryThreadCnt++; + usedTomcatThreadCnt++; + continue; + } + if (threadName.startsWith("http-bio-") && threadName.contains("exec")) { + if (isThreadRunningKylinCode(threadInfo)) { + usedTomcatThreadCnt++; + } + continue; + } + if (threadName.startsWith("kylin-coproc-") && isThreadRunningKylinCode(threadInfo)) { + usedCoprocessorThreadCnt++; + continue; + } + if (threadName.startsWith("IPC Client")) { + ipcThreadCnt++; + continue; + } + if (threadName.startsWith("hconnection-")) { + sharedHConnThreadCnt++; + } + } + usedQueryThreadsGauge.set(usedQueryThreadCnt); + usedTomcatThreadsGauge.set(usedTomcatThreadCnt); + usedCoprocessorThreadsGauge.set(usedCoprocessorThreadCnt); + hbaseIPCThreadsGauge.set(ipcThreadCnt); + sharedHConnectionThreadsGauge.set(sharedHConnThreadCnt); + totalThreadsGauge.set(totalThreadCnt); + } catch (Exception e) { + logger.error("error when collect app metrics", e); + } + } + + private boolean isQueryThread(String threadName) { + if (!threadName.startsWith("Query ")) { + return false; + } + try { + Long.parseLong(threadName.substring(threadName.lastIndexOf('-') + 1)); // thread id + } catch (Exception e) { + return false; + } + return true; + } + + private boolean isThreadRunningKylinCode(ThreadInfo threadInfo) { + StackTraceElement[] stackTraces = threadInfo.getStackTrace(); + for (StackTraceElement stackTrace : stackTraces) { + if (stackTrace.getClassName().contains("kylin")) { + return true; + } + } + return false; + } + } +} diff --git a/metrics-reporter-prometheus/src/main/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusReservoirReporter.java b/metrics-reporter-prometheus/src/main/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusReservoirReporter.java new file mode 100644 index 0000000..f4074ac --- /dev/null +++ b/metrics-reporter-prometheus/src/main/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusReservoirReporter.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.metrics.lib.impl.prometheus; + +import java.util.List; +import java.util.Properties; + +import org.apache.kylin.metrics.lib.ActiveReservoir; +import org.apache.kylin.metrics.lib.ActiveReservoirListener; +import org.apache.kylin.metrics.lib.ActiveReservoirReporter; +import org.apache.kylin.metrics.lib.Record; +import org.apache.kylin.metrics.lib.impl.ReporterBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PrometheusReservoirReporter extends ActiveReservoirReporter { + protected static final Logger logger = LoggerFactory.getLogger(PrometheusReservoirReporter.class); + public static final String PROMETHEUS_REPORTER_SUFFIX = "PROMETHEUS"; + private final ActiveReservoir activeReservoir; + private final PrometheusReservoirListener listener; + + public PrometheusReservoirReporter(ActiveReservoir activeReservoir, Properties props) throws Exception { + this.activeReservoir = activeReservoir; + this.listener = new PrometheusReservoirListener(props); + } + + /** + * Returns a new {@link Builder} for {@link PrometheusReservoirReporter}. + * + * @param activeReservoir the registry to report + * @return a {@link Builder} instance for a {@link PrometheusReservoirReporter} + */ + public static Builder forRegistry(ActiveReservoir activeReservoir) { + return new Builder(activeReservoir); + } + + /** + * Starts the reporter. + */ + public void start() { + activeReservoir.addListener(listener); + } + + /** + * Stops the reporter. + */ + public void stop() { + activeReservoir.removeListener(listener); + } + + /** + * Stops the reporter. + */ + @Override + public void close() { + stop(); + } + + protected PrometheusReservoirListener getListener() { + return listener; + } + + /** + * A builder for {@link PrometheusReservoirReporter} instances. + */ + public static class Builder extends ReporterBuilder { + + private Builder(ActiveReservoir activeReservoir) { + super(activeReservoir); + } + + private void setFixedProperties() { + } + + /** + * Builds a {@link PrometheusReservoirReporter} with the given properties. + * + * @return a {@link PrometheusReservoirReporter} + */ + public PrometheusReservoirReporter build() throws Exception { + setFixedProperties(); + return new PrometheusReservoirReporter(registry, props); + } + } + + protected class PrometheusReservoirListener implements ActiveReservoirListener { + + private int nRecord = 0; + private int nRecordSkip = 0; + + PrometheusEventProducer producer; + + private PrometheusReservoirListener(Properties props) throws Exception { + producer = new PrometheusEventProducer(props); + } + + public boolean onRecordUpdate(final List<Record> records) { + try { + producer.add(records); + nRecord += records.size(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + nRecordSkip += records.size(); + return false; + } + return true; + } + + public void close() { + try { + producer.close(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + + public int getNRecord() { + return nRecord; + } + + public int getNRecordSkip() { + return nRecordSkip; + } + } +} diff --git a/metrics-reporter-prometheus/src/test/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusEventProducerTest.java b/metrics-reporter-prometheus/src/test/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusEventProducerTest.java new file mode 100644 index 0000000..19d2c88 --- /dev/null +++ b/metrics-reporter-prometheus/src/test/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusEventProducerTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl.prometheus; + +import static org.junit.Assert.assertEquals; + +import java.util.Properties; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.exceptions.ResourceLimitExceededException; +import org.apache.kylin.metrics.lib.Record; +import org.apache.kylin.metrics.lib.impl.RecordEvent; +import org.apache.kylin.metrics.lib.impl.TimedRecordEvent; +import org.apache.kylin.metrics.property.JobPropertyEnum; +import org.apache.kylin.metrics.property.QueryCubePropertyEnum; +import org.apache.kylin.metrics.property.QueryPropertyEnum; +import org.apache.kylin.metrics.property.QueryRPCPropertyEnum; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class PrometheusEventProducerTest { + + private static final String project = "default"; + private static final String cubeName = "ssb"; + + private PrometheusEventProducer producer; + + @Before + public void setUp() throws Exception { + System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/localmeta"); + producer = new PrometheusEventProducer(new Properties()); + } + + @After + public void after() throws Exception { + producer.close(); + System.clearProperty(KylinConfig.KYLIN_CONF); + } + + @Test + public void testAddRecord() throws Exception { + producer.add(mockQueryRpcCallEvent()); + assertEquals(0L, producer.getQueryRPCLatencyGauge(project, cubeName)); + producer.add(mockQueryRpcCallExceptionEvent()); + assertEquals(500L, producer.getQueryRPCLatencyGauge(project, cubeName)); + + producer.add(mockQueryCubeEvent()); + producer.add(mockQueryEvent()); + assertEquals(55L, producer.getQueryLatencyGauge(project, cubeName)); + assertEquals(0L, producer.getQueryAvailabilityGauge(project, cubeName)); + assertEquals(0L, producer.getSlowQueryPercentage(project, cubeName)); + assertEquals(0L, producer.getQueryHitCachePercentage(project, cubeName)); + + producer.add(mockJobEvent()); + assertEquals(7200000L, producer.getJobDurationGauge(project, cubeName, "BUILD")); + assertEquals(500000L, producer.getBuildDictDurationGauge(project, cubeName)); + assertEquals(2000000L, producer.getDistinctColDurationGauge(project, cubeName)); + assertEquals(1500000L, producer.getInmemCubingDurationGauge(project, cubeName)); + assertEquals(1000000L, producer.getConvertHFileDurationGauge(project, cubeName)); + assertEquals(300000L, producer.getWaitResDurationGauge(project, cubeName, "BUILD")); + assertEquals(0L, producer.getErrorJobPercentage(project, cubeName, "BUILD")); + producer.add(mockJobExceptionEvent()); + producer.add(mockTestEvent()); + + assertEquals(1L, producer.getQueryCounter(project, cubeName)); + assertEquals(0L, producer.getBadQueryCounter(project, cubeName)); + assertEquals(0L, producer.getSlowQueryCounter(project, cubeName)); + assertEquals(1L, producer.getNormalQueryCounter(project, cubeName)); + assertEquals(0L, producer.getQueryHitCacheCounter(project, cubeName)); + assertEquals(0L, producer.getScanOutOfLimitExCounter(project, cubeName)); + assertEquals(1L, producer.getEpRPCExceptionCounter(project, cubeName)); + assertEquals(2L, producer.getJobCounter(project, cubeName, "BUILD")); + assertEquals(1L, producer.getErrorJobCounter(project, cubeName, "BUILD")); + } + + private Record mockQueryRpcCallEvent() { + RecordEvent metricsEvent = new TimedRecordEvent( + KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall()); + metricsEvent.put(QueryRPCPropertyEnum.PROJECT.toString(), project); + metricsEvent.put(QueryRPCPropertyEnum.REALIZATION.toString(), cubeName); + metricsEvent.put(QueryRPCPropertyEnum.RPC_SERVER.toString(), "localhost"); + metricsEvent.put(QueryRPCPropertyEnum.EXCEPTION.toString(), "NULL"); + + metricsEvent.put(QueryRPCPropertyEnum.CALL_TIME.toString(), 50L); + metricsEvent.put(QueryRPCPropertyEnum.SKIP_COUNT.toString(), 0L); + metricsEvent.put(QueryRPCPropertyEnum.SCAN_COUNT.toString(), 10L); + metricsEvent.put(QueryRPCPropertyEnum.RETURN_COUNT.toString(), 10L); + metricsEvent.put(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString(), 0L); + metricsEvent.put(QueryRPCPropertyEnum.AGGR_COUNT.toString(), 0L); + return metricsEvent; + } + + private Record mockQueryRpcCallExceptionEvent() { + RecordEvent metricsEvent = new TimedRecordEvent( + KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall()); + metricsEvent.put(QueryRPCPropertyEnum.PROJECT.toString(), project); + metricsEvent.put(QueryRPCPropertyEnum.REALIZATION.toString(), cubeName); + metricsEvent.put(QueryRPCPropertyEnum.RPC_SERVER.toString(), "localhost"); + metricsEvent.put(QueryRPCPropertyEnum.EXCEPTION.toString(), ResourceLimitExceededException.class); + + metricsEvent.put(QueryRPCPropertyEnum.CALL_TIME.toString(), 500L); + metricsEvent.put(QueryRPCPropertyEnum.SKIP_COUNT.toString(), 0L); + metricsEvent.put(QueryRPCPropertyEnum.SCAN_COUNT.toString(), 0L); + metricsEvent.put(QueryRPCPropertyEnum.RETURN_COUNT.toString(), 0L); + metricsEvent.put(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString(), 0L); + metricsEvent.put(QueryRPCPropertyEnum.AGGR_COUNT.toString(), 0L); + return metricsEvent; + } + + private Record mockQueryCubeEvent() { + RecordEvent metricsEvent = new TimedRecordEvent( + KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryCube()); + metricsEvent.put(QueryCubePropertyEnum.PROJECT.toString(), project); + metricsEvent.put(QueryCubePropertyEnum.CUBE.toString(), cubeName); + metricsEvent.put(QueryCubePropertyEnum.SEGMENT.toString(), "20130101000000_20131201000000"); + metricsEvent.put(QueryCubePropertyEnum.CUBOID_SOURCE.toString(), 255L); + metricsEvent.put(QueryCubePropertyEnum.CUBOID_TARGET.toString(), 255L); + metricsEvent.put(QueryCubePropertyEnum.IF_MATCH.toString(), true); + metricsEvent.put(QueryCubePropertyEnum.FILTER_MASK.toString(), 0L); + + metricsEvent.put(QueryCubePropertyEnum.CALL_COUNT.toString(), 1L); + metricsEvent.put(QueryCubePropertyEnum.TIME_SUM.toString(), 50L); + metricsEvent.put(QueryCubePropertyEnum.TIME_MAX.toString(), 50L); + metricsEvent.put(QueryCubePropertyEnum.SKIP_COUNT.toString(), 0L); + metricsEvent.put(QueryCubePropertyEnum.SCAN_COUNT.toString(), 10L); + metricsEvent.put(QueryCubePropertyEnum.RETURN_COUNT.toString(), 10L); + metricsEvent.put(QueryCubePropertyEnum.AGGR_FILTER_COUNT.toString(), 0L); + metricsEvent.put(QueryCubePropertyEnum.AGGR_COUNT.toString(), 0L); + metricsEvent.put(QueryCubePropertyEnum.IF_SUCCESS.toString(), true); + metricsEvent.put(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString(), 1.0); + return metricsEvent; + } + + private Record mockQueryEvent() { + RecordEvent metricsEvent = new TimedRecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQuery()); + metricsEvent.put(QueryPropertyEnum.USER.toString(), "ADMIN"); + metricsEvent.put(QueryPropertyEnum.ID_CODE.toString(), "123456"); + metricsEvent.put(QueryPropertyEnum.TYPE.toString(), "OLAP"); + metricsEvent.put(QueryPropertyEnum.PROJECT.toString(), project); + metricsEvent.put(QueryPropertyEnum.REALIZATION.toString(), cubeName); + metricsEvent.put(QueryPropertyEnum.REALIZATION_TYPE.toString(), "Cube"); + metricsEvent.put(QueryPropertyEnum.EXCEPTION.toString(), "NULL"); + + metricsEvent.put(QueryPropertyEnum.TIME_COST.toString(), 55L); + metricsEvent.put(QueryPropertyEnum.CALCITE_RETURN_COUNT.toString(), 10L); + metricsEvent.put(QueryPropertyEnum.STORAGE_RETURN_COUNT.toString(), 10L); + metricsEvent.put(QueryPropertyEnum.AGGR_FILTER_COUNT.toString(), 0L); + return metricsEvent; + } + + private Record mockJobEvent() { + RecordEvent metricsEvent = new TimedRecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJob()); + metricsEvent.put(JobPropertyEnum.USER.toString(), "ADMIN"); + metricsEvent.put(JobPropertyEnum.PROJECT.toString(), project); + metricsEvent.put(JobPropertyEnum.CUBE.toString(), cubeName); + metricsEvent.put(JobPropertyEnum.ID_CODE.toString(), "dfaf3fafaf"); + metricsEvent.put(JobPropertyEnum.TYPE.toString(), "BUILD"); + metricsEvent.put(JobPropertyEnum.ALGORITHM.toString(), "INMEM"); + + metricsEvent.put(JobPropertyEnum.SOURCE_SIZE.toString(), 1000000000L); + metricsEvent.put(JobPropertyEnum.CUBE_SIZE.toString(), 10000000L); + metricsEvent.put(JobPropertyEnum.BUILD_DURATION.toString(), 7200000L); + metricsEvent.put(JobPropertyEnum.WAIT_RESOURCE_TIME.toString(), 300000L); + metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), 0.0072); + metricsEvent.put(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS.toString(), 2000000L); + metricsEvent.put(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString(), 500000L); + metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), 1500000L); + metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), 1000000L); + return metricsEvent; + } + + private Record mockJobExceptionEvent() { + RecordEvent metricsEvent = new TimedRecordEvent( + KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJobException()); + metricsEvent.put(JobPropertyEnum.USER.toString(), "ADMIN"); + metricsEvent.put(JobPropertyEnum.PROJECT.toString(), project); + metricsEvent.put(JobPropertyEnum.CUBE.toString(), cubeName); + metricsEvent.put(JobPropertyEnum.ID_CODE.toString(), "dfaf3fafaf"); + metricsEvent.put(JobPropertyEnum.TYPE.toString(), "BUILD"); + metricsEvent.put(JobPropertyEnum.ALGORITHM.toString(), "INMEM"); + metricsEvent.put(JobPropertyEnum.EXCEPTION.toString(), + "org.apache.hadoop.security.authorize.AuthorizationException"); + return metricsEvent; + } + + private Record mockTestEvent() { + RecordEvent metricsEvent = new TimedRecordEvent("TEST"); + return metricsEvent; + } +} diff --git a/metrics-reporter-prometheus/src/test/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusReservoirReporterTest.java b/metrics-reporter-prometheus/src/test/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusReservoirReporterTest.java new file mode 100644 index 0000000..da1d5f9 --- /dev/null +++ b/metrics-reporter-prometheus/src/test/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusReservoirReporterTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl.prometheus; + +import static org.junit.Assert.assertEquals; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.metrics.lib.ActiveReservoir; +import org.apache.kylin.metrics.lib.Record; +import org.apache.kylin.metrics.lib.impl.InstantReservoir; +import org.apache.kylin.metrics.lib.impl.RecordEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.rule.PowerMockRule; + +import com.google.common.collect.Lists; + +@PrepareForTest({ PrometheusReservoirReporter.PrometheusReservoirListener.class }) +public class PrometheusReservoirReporterTest { + + @Rule + public PowerMockRule rule = new PowerMockRule(); + + private PrometheusReservoirReporter prometheusReporter; + private ActiveReservoir reservoir; + + @Before + public void setUp() throws Exception { + System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/localmeta"); + + PrometheusEventProducer producer = PowerMockito.mock(PrometheusEventProducer.class); + PowerMockito.whenNew(PrometheusEventProducer.class).withAnyArguments().thenReturn(producer); + + reservoir = new InstantReservoir(); + reservoir.start(); + prometheusReporter = PrometheusReservoirReporter.forRegistry(reservoir).build(); + } + + @After + public void after() throws Exception { + System.clearProperty(KylinConfig.KYLIN_CONF); + } + + @Test + public void testUpdate() throws Exception { + Record record = new RecordEvent("TEST"); + reservoir.update(record); + assertEquals(0, prometheusReporter.getListener().getNRecord()); + + prometheusReporter.start(); + reservoir.update(record); + reservoir.update(record); + assertEquals(2, prometheusReporter.getListener().getNRecord()); + + prometheusReporter.stop(); + reservoir.update(record); + assertEquals(2, prometheusReporter.getListener().getNRecord()); + + prometheusReporter.start(); + PowerMockito.doThrow(new RuntimeException()).when(prometheusReporter.getListener().producer) + .add(Lists.newArrayList(record)); + reservoir.update(record); + assertEquals(2, prometheusReporter.getListener().getNRecord()); + assertEquals(1, prometheusReporter.getListener().getNRecordSkip()); + } +} diff --git a/pom.xml b/pom.xml index 916ad18..ed96e06 100644 --- a/pom.xml +++ b/pom.xml @@ -78,6 +78,9 @@ <!-- Kafka versions --> <kafka.version>2.1.1</kafka.version> + <!-- Prometheus versions --> + <prometheus.version>0.0.21</prometheus.version> + <!-- Spark versions --> <spark.version>2.4.5</spark.version> <kryo.version>4.0.0</kryo.version> @@ -273,6 +276,11 @@ </dependency> <dependency> <groupId>org.apache.kylin</groupId> + <artifactId>kylin-metrics-reporter-prometheus</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> <artifactId>kylin-core-metadata</artifactId> <version>${project.version}</version> </dependency> @@ -1387,6 +1395,7 @@ <module>core-metrics</module> <module>metrics-reporter-hive</module> <module>metrics-reporter-kafka</module> + <module>metrics-reporter-prometheus</module> <module>cache</module> <module>cube-migration</module> <module>datasource-sdk</module> diff --git a/server/src/main/resources/kylinMetrics.xml b/server/src/main/resources/kylinMetrics.xml index 2b89553..5a31d3b 100644 --- a/server/src/main/resources/kylinMetrics.xml +++ b/server/src/main/resources/kylinMetrics.xml @@ -51,9 +51,9 @@ <list> <ref bean="hiveSink"/> <map key-type="org.apache.kylin.metrics.lib.ActiveReservoir" value-type="java.util.List"> - <!-- <entry key-ref="instantReservoir"> <list> + <!-- <bean class="org.apache.kylin.common.util.Pair"> <property name="first" value="org.apache.kylin.metrics.lib.impl.kafka.KafkaReservoirReporter"/> @@ -63,9 +63,17 @@ </props> </property> </bean> + --> + <bean class="org.apache.kylin.common.util.Pair"> + <property name="first" + value="org.apache.kylin.metrics.lib.impl.prometheus.PrometheusReservoirReporter"/> + <property name="second"> + <props> + </props> + </property> + </bean> </list> </entry> - --> <entry key-ref="blockingReservoir"> <list> <bean class="org.apache.kylin.common.util.Pair">