This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to tag ebay-3.1.0-release-20200701
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 2b3b6df124dc93c7efd4dbdd32a92d87c2b8dcc1
Author: Wang gang <gwa...@ebay.com>
AuthorDate: Fri Jun 19 10:31:14 2020 +0800

    EBAY-KYLIN-1919 Add prometheus metrics reporter
---
 metrics-reporter-prometheus/pom.xml                |  79 ++++
 .../impl/prometheus/PrometheusEventProducer.java   | 489 +++++++++++++++++++++
 .../prometheus/PrometheusReservoirReporter.java    | 140 ++++++
 .../prometheus/PrometheusEventProducerTest.java    | 207 +++++++++
 .../PrometheusReservoirReporterTest.java           |  86 ++++
 pom.xml                                            |   9 +
 server/src/main/resources/kylinMetrics.xml         |  12 +-
 7 files changed, 1020 insertions(+), 2 deletions(-)

diff --git a/metrics-reporter-prometheus/pom.xml 
b/metrics-reporter-prometheus/pom.xml
new file mode 100644
index 0000000..f1ac16c
--- /dev/null
+++ b/metrics-reporter-prometheus/pom.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kylin-metrics-reporter-prometheus</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache Kylin - Metrics Reporter Prometheus</name>
+    <description>Apache Kylin - Metrics Reporter Prometheus</description>
+
+    <parent>
+        <artifactId>kylin</artifactId>
+        <groupId>org.apache.kylin</groupId>
+        <version>3.1.0</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-metadata</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-storage-hbase</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.prometheus</groupId>
+            <artifactId>simpleclient_servlet</artifactId>
+            <version>${prometheus.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-servlet</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-server</artifactId>
+        </dependency>
+
+        <!-- Env & Test -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4-rule-agent</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/metrics-reporter-prometheus/src/main/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusEventProducer.java
 
b/metrics-reporter-prometheus/src/main/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusEventProducer.java
new file mode 100644
index 0000000..8784c18
--- /dev/null
+++ 
b/metrics-reporter-prometheus/src/main/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusEventProducer.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.metrics.lib.impl.prometheus;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
+import org.apache.kylin.metrics.lib.Record;
+import org.apache.kylin.metrics.property.JobPropertyEnum;
+import org.apache.kylin.metrics.property.QueryPropertyEnum;
+import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
+import org.apache.kylin.stream.core.util.NamedThreadFactory;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.exporter.MetricsServlet;
+
+public class PrometheusEventProducer {
+    private static final Logger logger = 
LoggerFactory.getLogger(PrometheusEventProducer.class);
+    private Server server;
+
+    private Gauge queryLatencyGauge;
+    private Gauge queryRPCLatencyGauge; //Only for RPC with exceptions
+    private Counter queryCounter;
+    private Gauge queryAvailabilityGauge;
+    private Gauge slowQueryPercentage;
+    private Counter badQueryCounter;
+    private Counter slowQueryCounter;
+    private Counter normalQueryCounter;
+    private Counter queryHitCacheCounter;
+    private Gauge queryHitCachePercentage;
+    private Counter scanOutOfLimitExCounter;
+
+    //end point rpc exception counter
+    private Counter epRPCExceptionCounter;
+
+    private Counter errorJobCounter;
+    private Counter jobCounter;
+    private Gauge errorJobPercentage;
+    private Gauge jobDurationGauge;
+    private Gauge buildDictDurationGauge;
+    private Gauge distinctColDurationGauge;
+    private Gauge convertHFileDurationGauge;
+    private Gauge inmemCubingDurationGauge;
+    private Gauge waitResDurationGauge;
+
+    private Gauge totalThreadsGauge;
+    private Gauge usedTomcatThreadsGauge;
+    private Gauge usedQueryThreadsGauge;
+    private Gauge usedCoprocessorThreadsGauge;
+    private Gauge hbaseIPCThreadsGauge;
+    private Gauge sharedHConnectionThreadsGauge;
+
+    private ScheduledExecutorService appMetricsCollectorExecutor;
+
+    public PrometheusEventProducer(Properties props) throws Exception {
+        String host = props.getProperty("agent.host");
+        if (host == null || host == "") {
+            logger.warn("agent host not configured");
+            host = InetAddress.getLocalHost().getHostName();
+        }
+        String port = props.getProperty("agent.port", "1997");
+        start(host, Integer.valueOf(port));
+        initMetrics();
+        appMetricsCollectorExecutor = Executors
+                .newSingleThreadScheduledExecutor(new 
NamedThreadFactory("metrics-collector"));
+        appMetricsCollectorExecutor.scheduleAtFixedRate(new 
AppMetricsCollector(), 60, 60, TimeUnit.SECONDS);
+    }
+
+    private void initMetrics() {
+        queryLatencyGauge = Gauge.build().name("query_latency").help("query 
latency")
+                .labelNames("project_name", "cube_name").register();
+
+        queryRPCLatencyGauge = 
Gauge.build().name("query_rpc_latency").help("query rpc latency")
+                .labelNames("project_name", "cube_name").register();
+
+        queryCounter = Counter.build().name("query_count").help("query 
count").labelNames("project_name", "cube_name")
+                .register();
+
+        scanOutOfLimitExCounter = 
Counter.build().name("scanoutoflimit_exception_count")
+                .help("scan out of limit exception 
count").labelNames("project_name", "cube_name").register();
+
+        queryAvailabilityGauge = Gauge.build().name("query_availability").help(
+                "query availability: (count of queries with HBase rpc 
exception, exclude ScanOutOfLimitException)/(query count)")
+                .labelNames("project_name", "cube_name").register();
+
+        badQueryCounter = Counter.build().name("bad_query_count").help("query 
with any exception count")
+                .labelNames("project_name", "cube_name").register();
+
+        slowQueryCounter = Counter.build().name("slow_query").help("query with 
long latency count")
+                .labelNames("project_name", "cube_name").register();
+
+        slowQueryPercentage = Gauge.build().name("slow_query_percentage")
+                .help("slow query percentage: (slow query count)/(slow query 
and normal query count)")
+                .labelNames("project_name", "cube_name").register();
+
+        normalQueryCounter = 
Counter.build().name("normal_query_count").help("query with normal latency 
count")
+                .labelNames("project_name", "cube_name").register();
+
+        queryHitCacheCounter = 
Counter.build().name("query_hit_cache_count").help("query hit cache count")
+                .labelNames("project_name", "cube_name").register();
+
+        queryHitCachePercentage = 
Gauge.build().name("query_hit_cache_percentage").help("query hit cache 
percentage")
+                .labelNames("project_name", "cube_name").register();
+
+        epRPCExceptionCounter = 
Counter.build().name("endpoint_rpc_exception_count")
+                .help("query with HBase coprocessor endpoint exception count, 
exclude ScanOutOfLimitException")
+                .labelNames("project_name", "cube_name").register();
+
+        errorJobCounter = Counter.build().name("error_job_count").help("error 
job count")
+                .labelNames("project_name", "cube_name", 
"job_type").register();
+
+        jobCounter = Counter.build().name("job_count").help("job count")
+                .labelNames("project_name", "cube_name", 
"job_type").register();
+
+        errorJobPercentage = 
Gauge.build().name("error_job_percentage").help("error job percentage")
+                .labelNames("project_name", "cube_name", 
"job_type").register();
+
+        jobDurationGauge = 
Gauge.build().name("cube_build_duration").help("build cube job duration")
+                .labelNames("project_name", "cube_name", 
"job_type").register();
+
+        buildDictDurationGauge = 
Gauge.build().name("build_dictionary_duration").help("build dictionary step 
duration")
+                .labelNames("project_name", "cube_name").register();
+
+        distinctColDurationGauge = 
Gauge.build().name("distinct_columns_duration")
+                .help("get distinct columns step 
duration").labelNames("project_name", "cube_name").register();
+
+        convertHFileDurationGauge = 
Gauge.build().name("convert_hfile_duration").help("convert HFile step duration")
+                .labelNames("project_name", "cube_name").register();
+
+        inmemCubingDurationGauge = 
Gauge.build().name("inmemory_cubing_duration").help("in-memory cubing step 
duration")
+                .labelNames("project_name", "cube_name").register();
+
+        waitResDurationGauge = 
Gauge.build().name("wait_resource_duration").help("job wait resource duration")
+                .labelNames("project_name", "cube_name", 
"job_type").register();
+
+        totalThreadsGauge = Gauge.build().name("total_thread_cnt").help("total 
threads count used by Kylin").register();
+
+        usedTomcatThreadsGauge = Gauge.build().name("used_tomcat_thread_cnt")
+                .help("currently used tomcat processing threads 
count").register();
+
+        usedQueryThreadsGauge = 
Gauge.build().name("used_query_thread_cnt").help("currently used query threads 
count")
+                .register();
+
+        usedCoprocessorThreadsGauge = 
Gauge.build().name("used_coprocessor_thread_cnt")
+                .help("currently used coprocessor threads count").register();
+
+        hbaseIPCThreadsGauge = 
Gauge.build().name("ipc_client_thread_cnt").help("current ipc threads 
count").register();
+
+        sharedHConnectionThreadsGauge = 
Gauge.build().name("shared_hconn_thread_cnt")
+                .help("current shared hconnection threads count").register();
+    }
+
+    private void start(String host, int port) throws Exception {
+        InetSocketAddress socket = new InetSocketAddress(host, port);
+        logger.debug("prometheus agent: " + host + ":" + port);
+        server = new Server(socket);
+        ServletContextHandler context = new ServletContextHandler();
+        context.setContextPath("/");
+        server.setHandler(context);
+        context.addServlet(new ServletHolder(new MetricsServlet()), 
"/metrics");
+        server.start();
+        logger.info("prometheus agent started!");
+    }
+
+    public void add(Collection<Record> records) {
+        for (Record record : records) {
+            add(record);
+        }
+    }
+
+    public void add(Record record) {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+
+        String type = record.getSubject();
+        if 
(type.equalsIgnoreCase(kylinConfig.getKylinMetricsSubjectQueryRpcCall())) {
+            addQueryRPCMetrics(record);
+        } else if 
(type.equalsIgnoreCase(kylinConfig.getKylinMetricsSubjectQuery())) {
+            addQueryMetrics(record);
+        } else if 
(type.equalsIgnoreCase(kylinConfig.getKylinMetricsSubjectJobException())) {
+            addJobExceptionMetrics(record);
+        } else if 
(type.equalsIgnoreCase(kylinConfig.getKylinMetricsSubjectJob())) {
+            addJobMetrics(record);
+        } else if 
(type.equalsIgnoreCase(kylinConfig.getKylinMetricsSubjectQueryCube())) {
+            //metric METRICS_QUERY already cover metric of query on cube, do 
nothing here
+        } else {
+            logger.warn("unknown metrics record type.");
+        }
+    }
+
+    private void addQueryRPCMetrics(Record record) {
+        Map<String, Object> kvs = record.getValueRaw();
+        String prjName = 
String.valueOf(kvs.get(QueryRPCPropertyEnum.PROJECT.toString()));
+        String cubeName = 
String.valueOf(kvs.get(QueryRPCPropertyEnum.REALIZATION.toString()));
+
+        long callTime = 
Long.valueOf(String.valueOf(kvs.get(QueryRPCPropertyEnum.CALL_TIME.toString())));
+        Object exception = kvs.get(QueryRPCPropertyEnum.EXCEPTION.toString());
+        if (exception.toString() != "NULL") {
+            queryRPCLatencyGauge.labels(prjName, cubeName).set(callTime);
+            if 
(!exception.toString().equalsIgnoreCase(ResourceLimitExceededException.class.getName()))
 {
+                logger.debug("endpoint rpc exception: " + 
exception.toString());
+                epRPCExceptionCounter.labels(prjName, cubeName).inc();
+            }
+        }
+    }
+
+    private void addQueryMetrics(Record record) {
+        Map<String, Object> kvs = record.getValueRaw();
+        String prjName = 
String.valueOf(kvs.get(QueryPropertyEnum.PROJECT.toString()));
+        String cubeName = 
String.valueOf(kvs.get(QueryPropertyEnum.REALIZATION.toString()));
+        String type = 
String.valueOf(kvs.get(QueryPropertyEnum.TYPE.toString()));
+
+        long queryLatency = 
Long.valueOf(String.valueOf(kvs.get(QueryPropertyEnum.TIME_COST.toString())));
+        queryLatencyGauge.labels(prjName, cubeName).set(queryLatency);
+
+        Object exception = kvs.get(QueryPropertyEnum.EXCEPTION.toString());
+        //both InternalErrorException and other exceptions mark as exception
+        if (exception.toString() != "NULL") {
+            badQueryCounter.labels(prjName, cubeName).inc();
+            if 
(exception.toString().equalsIgnoreCase(ResourceLimitExceededException.class.getName()))
 {
+                scanOutOfLimitExCounter.labels(prjName, cubeName).inc();
+            }
+        } else {
+            if (queryLatency > 
Long.valueOf(KylinConfig.getInstanceFromEnv().getBadQueryDefaultAlertingSeconds()))
 {
+                slowQueryCounter.labels(prjName, cubeName).inc();
+            } else {
+                normalQueryCounter.labels(prjName, cubeName).inc();
+            }
+        }
+
+        queryCounter.labels(prjName, cubeName).inc();
+
+        double epRPCExceptionCount = epRPCExceptionCounter.labels(prjName, 
cubeName).get();
+        double queryCount = queryCounter.labels(prjName, cubeName).get();
+        queryAvailabilityGauge.labels(prjName, cubeName).set(1 - 
epRPCExceptionCount / queryCount);
+
+        double slowQueryCount = slowQueryCounter.labels(prjName, 
cubeName).get();
+        double normalQueryCount = normalQueryCounter.labels(prjName, 
cubeName).get();
+        slowQueryPercentage.labels(prjName, cubeName).set(slowQueryCount / 
(slowQueryCount + normalQueryCount));
+
+        if (type != null && type.equals("CACHE")) {
+            queryHitCacheCounter.labels(prjName, cubeName).inc();
+            double queryHitCacheCount = queryHitCacheCounter.labels(prjName, 
cubeName).get();
+            double queryCount2 = queryCounter.labels(prjName, cubeName).get();
+            queryHitCachePercentage.labels(prjName, 
cubeName).set(queryHitCacheCount / queryCount2);
+        }
+    }
+
+    //for success job, add duration metrics
+    private void addJobMetrics(Record record) {
+        Map<String, Object> kvs = record.getValueRaw();
+        String prjName = 
String.valueOf(kvs.get(JobPropertyEnum.PROJECT.toString()));
+        String cubeName = 
String.valueOf(kvs.get(JobPropertyEnum.CUBE.toString()));
+        String jobType = 
String.valueOf(kvs.get(JobPropertyEnum.TYPE.toString()));
+
+        long jobDuration = 
Long.valueOf(String.valueOf(kvs.get(JobPropertyEnum.BUILD_DURATION.toString())));
+        long waitResourceDuration = Long
+                
.valueOf(String.valueOf(kvs.get(JobPropertyEnum.WAIT_RESOURCE_TIME.toString())));
+        long buildDictDuration = Long
+                
.valueOf(String.valueOf(kvs.get(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString())));
+        long distinctColDuration = Long
+                
.valueOf(String.valueOf(kvs.get(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS.toString())));
+        long convertHFileDuration = Long
+                
.valueOf(String.valueOf(kvs.get(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString())));
+        long inmemCubingDuration = Long
+                
.valueOf(String.valueOf(kvs.get(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString())));
+
+        jobDurationGauge.labels(prjName, cubeName, jobType).set(jobDuration);
+        buildDictDurationGauge.labels(prjName, 
cubeName).set(buildDictDuration);
+        distinctColDurationGauge.labels(prjName, 
cubeName).set(distinctColDuration);
+        convertHFileDurationGauge.labels(prjName, 
cubeName).set(convertHFileDuration);
+        inmemCubingDurationGauge.labels(prjName, 
cubeName).set(inmemCubingDuration);
+
+        waitResDurationGauge.labels(prjName, cubeName, 
jobType).set(waitResourceDuration);
+        jobCounter.labels(prjName, cubeName, jobType).inc();
+
+        double errorJobCount = errorJobCounter.labels(prjName, cubeName, 
jobType).get();
+        double jobCount = jobCounter.labels(prjName, cubeName, jobType).get();
+        errorJobPercentage.labels(prjName, cubeName, 
jobType).set(errorJobCount / jobCount);
+    }
+
+    private void addJobExceptionMetrics(Record record) {
+        Map<String, Object> kvs = record.getValueRaw();
+        String prjName = 
String.valueOf(kvs.get(JobPropertyEnum.PROJECT.toString()));
+        String cubeName = 
String.valueOf(kvs.get(JobPropertyEnum.CUBE.toString()));
+        String jobType = 
String.valueOf(kvs.get(JobPropertyEnum.TYPE.toString()));
+
+        Object exception = kvs.get(JobPropertyEnum.EXCEPTION.toString());
+        if (exception != null) {
+            errorJobCounter.labels(prjName, cubeName, jobType).inc();
+        }
+        jobCounter.labels(prjName, cubeName, jobType).inc();
+    }
+
+    public void close() throws Exception {
+        server.stop();
+        logger.info("Prometheus agent closed!");
+    }
+
+    public long getQueryLatencyGauge(String prjName, String cubeName) {
+        return (long) queryLatencyGauge.labels(prjName, cubeName).get();
+    }
+
+    public long getQueryRPCLatencyGauge(String prjName, String cubeName) {
+        return (long) queryRPCLatencyGauge.labels(prjName, cubeName).get();
+    }
+
+    public long getQueryCounter(String prjName, String cubeName) {
+        return (long) queryCounter.labels(prjName, cubeName).get();
+    }
+
+    public long getQueryAvailabilityGauge(String prjName, String cubeName) {
+        return (long) queryAvailabilityGauge.labels(prjName, cubeName).get();
+    }
+
+    public long getSlowQueryPercentage(String prjName, String cubeName) {
+        return (long) slowQueryPercentage.labels(prjName, cubeName).get();
+    }
+
+    public long getBadQueryCounter(String prjName, String cubeName) {
+        return (long) badQueryCounter.labels(prjName, cubeName).get();
+    }
+
+    public long getSlowQueryCounter(String prjName, String cubeName) {
+        return (long) slowQueryCounter.labels(prjName, cubeName).get();
+    }
+
+    public long getNormalQueryCounter(String prjName, String cubeName) {
+        return (long) normalQueryCounter.labels(prjName, cubeName).get();
+    }
+
+    public long getQueryHitCacheCounter(String prjName, String cubeName) {
+        return (long) queryHitCacheCounter.labels(prjName, cubeName).get();
+    }
+
+    public long getQueryHitCachePercentage(String prjName, String cubeName) {
+        return (long) queryHitCachePercentage.labels(prjName, cubeName).get();
+    }
+
+    public long getScanOutOfLimitExCounter(String prjName, String cubeName) {
+        return (long) scanOutOfLimitExCounter.labels(prjName, cubeName).get();
+    }
+
+    public long getEpRPCExceptionCounter(String prjName, String cubeName) {
+        return (long) epRPCExceptionCounter.labels(prjName, cubeName).get();
+    }
+
+    public long getJobCounter(String prjName, String cubeName, String jobType) 
{
+        return (long) jobCounter.labels(prjName, cubeName, jobType).get();
+    }
+
+    public long getErrorJobPercentage(String prjName, String cubeName, String 
jobType) {
+        return (long) errorJobPercentage.labels(prjName, cubeName, 
jobType).get();
+    }
+
+    public long getErrorJobCounter(String prjName, String cubeName, String 
jobType) {
+        return (long) errorJobCounter.labels(prjName, cubeName, jobType).get();
+    }
+
+    public long getJobDurationGauge(String prjName, String cubeName, String 
jobType) {
+        return (long) jobDurationGauge.labels(prjName, cubeName, 
jobType).get();
+    }
+
+    public long getBuildDictDurationGauge(String prjName, String cubeName) {
+        return (long) buildDictDurationGauge.labels(prjName, cubeName).get();
+    }
+
+    public long getDistinctColDurationGauge(String prjName, String cubeName) {
+        return (long) distinctColDurationGauge.labels(prjName, cubeName).get();
+    }
+
+    public long getConvertHFileDurationGauge(String prjName, String cubeName) {
+        return (long) convertHFileDurationGauge.labels(prjName, 
cubeName).get();
+    }
+
+    public long getInmemCubingDurationGauge(String prjName, String cubeName) {
+        return (long) inmemCubingDurationGauge.labels(prjName, cubeName).get();
+    }
+
+    public long getWaitResDurationGauge(String prjName, String cubeName, 
String jobType) {
+        return (long) waitResDurationGauge.labels(prjName, cubeName, 
jobType).get();
+    }
+
+    private class AppMetricsCollector implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                // TODO collect tomcat information from tomcat JMX?
+                ThreadInfo[] allThreads = 
ManagementFactory.getThreadMXBean().dumpAllThreads(false, false);
+                int usedTomcatThreadCnt = 0;
+                int usedQueryThreadCnt = 0;
+                int usedCoprocessorThreadCnt = 0;
+                int totalThreadCnt = 0;
+                int ipcThreadCnt = 0;
+                int sharedHConnThreadCnt = 0;
+                for (ThreadInfo threadInfo : allThreads) {
+                    totalThreadCnt++;
+                    String threadName = threadInfo.getThreadName();
+                    if (threadName == null) {
+                        continue;
+                    }
+                    if (isQueryThread(threadName)) {
+                        usedQueryThreadCnt++;
+                        usedTomcatThreadCnt++;
+                        continue;
+                    }
+                    if (threadName.startsWith("http-bio-") && 
threadName.contains("exec")) {
+                        if (isThreadRunningKylinCode(threadInfo)) {
+                            usedTomcatThreadCnt++;
+                        }
+                        continue;
+                    }
+                    if (threadName.startsWith("kylin-coproc-") && 
isThreadRunningKylinCode(threadInfo)) {
+                        usedCoprocessorThreadCnt++;
+                        continue;
+                    }
+                    if (threadName.startsWith("IPC Client")) {
+                        ipcThreadCnt++;
+                        continue;
+                    }
+                    if (threadName.startsWith("hconnection-")) {
+                        sharedHConnThreadCnt++;
+                    }
+                }
+                usedQueryThreadsGauge.set(usedQueryThreadCnt);
+                usedTomcatThreadsGauge.set(usedTomcatThreadCnt);
+                usedCoprocessorThreadsGauge.set(usedCoprocessorThreadCnt);
+                hbaseIPCThreadsGauge.set(ipcThreadCnt);
+                sharedHConnectionThreadsGauge.set(sharedHConnThreadCnt);
+                totalThreadsGauge.set(totalThreadCnt);
+            } catch (Exception e) {
+                logger.error("error when collect app metrics", e);
+            }
+        }
+
+        private boolean isQueryThread(String threadName) {
+            if (!threadName.startsWith("Query ")) {
+                return false;
+            }
+            try {
+                
Long.parseLong(threadName.substring(threadName.lastIndexOf('-') + 1)); // 
thread id
+            } catch (Exception e) {
+                return false;
+            }
+            return true;
+        }
+
+        private boolean isThreadRunningKylinCode(ThreadInfo threadInfo) {
+            StackTraceElement[] stackTraces = threadInfo.getStackTrace();
+            for (StackTraceElement stackTrace : stackTraces) {
+                if (stackTrace.getClassName().contains("kylin")) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+}
diff --git 
a/metrics-reporter-prometheus/src/main/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusReservoirReporter.java
 
b/metrics-reporter-prometheus/src/main/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusReservoirReporter.java
new file mode 100644
index 0000000..f4074ac
--- /dev/null
+++ 
b/metrics-reporter-prometheus/src/main/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusReservoirReporter.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.metrics.lib.impl.prometheus;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+import org.apache.kylin.metrics.lib.Record;
+import org.apache.kylin.metrics.lib.impl.ReporterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PrometheusReservoirReporter extends ActiveReservoirReporter {
+    protected static final Logger logger = 
LoggerFactory.getLogger(PrometheusReservoirReporter.class);
+    public static final String PROMETHEUS_REPORTER_SUFFIX = "PROMETHEUS";
+    private final ActiveReservoir activeReservoir;
+    private final PrometheusReservoirListener listener;
+
+    public PrometheusReservoirReporter(ActiveReservoir activeReservoir, 
Properties props) throws Exception {
+        this.activeReservoir = activeReservoir;
+        this.listener = new PrometheusReservoirListener(props);
+    }
+
+    /**
+     * Returns a new {@link Builder} for {@link PrometheusReservoirReporter}.
+     *
+     * @param activeReservoir the registry to report
+     * @return a {@link Builder} instance for a {@link 
PrometheusReservoirReporter}
+     */
+    public static Builder forRegistry(ActiveReservoir activeReservoir) {
+        return new Builder(activeReservoir);
+    }
+
+    /**
+     * Starts the reporter.
+     */
+    public void start() {
+        activeReservoir.addListener(listener);
+    }
+
+    /**
+     * Stops the reporter.
+     */
+    public void stop() {
+        activeReservoir.removeListener(listener);
+    }
+
+    /**
+     * Stops the reporter.
+     */
+    @Override
+    public void close() {
+        stop();
+    }
+
+    protected PrometheusReservoirListener getListener() {
+        return listener;
+    }
+
+    /**
+     * A builder for {@link PrometheusReservoirReporter} instances.
+     */
+    public static class Builder extends ReporterBuilder {
+
+        private Builder(ActiveReservoir activeReservoir) {
+            super(activeReservoir);
+        }
+
+        private void setFixedProperties() {
+        }
+
+        /**
+         * Builds a {@link PrometheusReservoirReporter} with the given 
properties.
+         *
+         * @return a {@link PrometheusReservoirReporter}
+         */
+        public PrometheusReservoirReporter build() throws Exception {
+            setFixedProperties();
+            return new PrometheusReservoirReporter(registry, props);
+        }
+    }
+
+    protected class PrometheusReservoirListener implements 
ActiveReservoirListener {
+
+        private int nRecord = 0;
+        private int nRecordSkip = 0;
+
+        PrometheusEventProducer producer;
+
+        private PrometheusReservoirListener(Properties props) throws Exception 
{
+            producer = new PrometheusEventProducer(props);
+        }
+
+        public boolean onRecordUpdate(final List<Record> records) {
+            try {
+                producer.add(records);
+                nRecord += records.size();
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+                nRecordSkip += records.size();
+                return false;
+            }
+            return true;
+        }
+
+        public void close() {
+            try {
+                producer.close();
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        public int getNRecord() {
+            return nRecord;
+        }
+
+        public int getNRecordSkip() {
+            return nRecordSkip;
+        }
+    }
+}
diff --git 
a/metrics-reporter-prometheus/src/test/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusEventProducerTest.java
 
b/metrics-reporter-prometheus/src/test/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusEventProducerTest.java
new file mode 100644
index 0000000..19d2c88
--- /dev/null
+++ 
b/metrics-reporter-prometheus/src/test/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusEventProducerTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.prometheus;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Properties;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
+import org.apache.kylin.metrics.lib.Record;
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
+import org.apache.kylin.metrics.property.JobPropertyEnum;
+import org.apache.kylin.metrics.property.QueryCubePropertyEnum;
+import org.apache.kylin.metrics.property.QueryPropertyEnum;
+import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PrometheusEventProducerTest {
+
+    private static final String project = "default";
+    private static final String cubeName = "ssb";
+
+    private PrometheusEventProducer producer;
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty(KylinConfig.KYLIN_CONF, 
"../examples/test_case_data/localmeta");
+        producer = new PrometheusEventProducer(new Properties());
+    }
+
+    @After
+    public void after() throws Exception {
+        producer.close();
+        System.clearProperty(KylinConfig.KYLIN_CONF);
+    }
+
+    @Test
+    public void testAddRecord() throws Exception {
+        producer.add(mockQueryRpcCallEvent());
+        assertEquals(0L, producer.getQueryRPCLatencyGauge(project, cubeName));
+        producer.add(mockQueryRpcCallExceptionEvent());
+        assertEquals(500L, producer.getQueryRPCLatencyGauge(project, 
cubeName));
+
+        producer.add(mockQueryCubeEvent());
+        producer.add(mockQueryEvent());
+        assertEquals(55L, producer.getQueryLatencyGauge(project, cubeName));
+        assertEquals(0L, producer.getQueryAvailabilityGauge(project, 
cubeName));
+        assertEquals(0L, producer.getSlowQueryPercentage(project, cubeName));
+        assertEquals(0L, producer.getQueryHitCachePercentage(project, 
cubeName));
+
+        producer.add(mockJobEvent());
+        assertEquals(7200000L, producer.getJobDurationGauge(project, cubeName, 
"BUILD"));
+        assertEquals(500000L, producer.getBuildDictDurationGauge(project, 
cubeName));
+        assertEquals(2000000L, producer.getDistinctColDurationGauge(project, 
cubeName));
+        assertEquals(1500000L, producer.getInmemCubingDurationGauge(project, 
cubeName));
+        assertEquals(1000000L, producer.getConvertHFileDurationGauge(project, 
cubeName));
+        assertEquals(300000L, producer.getWaitResDurationGauge(project, 
cubeName, "BUILD"));
+        assertEquals(0L, producer.getErrorJobPercentage(project, cubeName, 
"BUILD"));
+        producer.add(mockJobExceptionEvent());
+        producer.add(mockTestEvent());
+
+        assertEquals(1L, producer.getQueryCounter(project, cubeName));
+        assertEquals(0L, producer.getBadQueryCounter(project, cubeName));
+        assertEquals(0L, producer.getSlowQueryCounter(project, cubeName));
+        assertEquals(1L, producer.getNormalQueryCounter(project, cubeName));
+        assertEquals(0L, producer.getQueryHitCacheCounter(project, cubeName));
+        assertEquals(0L, producer.getScanOutOfLimitExCounter(project, 
cubeName));
+        assertEquals(1L, producer.getEpRPCExceptionCounter(project, cubeName));
+        assertEquals(2L, producer.getJobCounter(project, cubeName, "BUILD"));
+        assertEquals(1L, producer.getErrorJobCounter(project, cubeName, 
"BUILD"));
+    }
+
+    private Record mockQueryRpcCallEvent() {
+        RecordEvent metricsEvent = new TimedRecordEvent(
+                
KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall());
+        metricsEvent.put(QueryRPCPropertyEnum.PROJECT.toString(), project);
+        metricsEvent.put(QueryRPCPropertyEnum.REALIZATION.toString(), 
cubeName);
+        metricsEvent.put(QueryRPCPropertyEnum.RPC_SERVER.toString(), 
"localhost");
+        metricsEvent.put(QueryRPCPropertyEnum.EXCEPTION.toString(), "NULL");
+
+        metricsEvent.put(QueryRPCPropertyEnum.CALL_TIME.toString(), 50L);
+        metricsEvent.put(QueryRPCPropertyEnum.SKIP_COUNT.toString(), 0L);
+        metricsEvent.put(QueryRPCPropertyEnum.SCAN_COUNT.toString(), 10L);
+        metricsEvent.put(QueryRPCPropertyEnum.RETURN_COUNT.toString(), 10L);
+        metricsEvent.put(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString(), 
0L);
+        metricsEvent.put(QueryRPCPropertyEnum.AGGR_COUNT.toString(), 0L);
+        return metricsEvent;
+    }
+
+    private Record mockQueryRpcCallExceptionEvent() {
+        RecordEvent metricsEvent = new TimedRecordEvent(
+                
KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall());
+        metricsEvent.put(QueryRPCPropertyEnum.PROJECT.toString(), project);
+        metricsEvent.put(QueryRPCPropertyEnum.REALIZATION.toString(), 
cubeName);
+        metricsEvent.put(QueryRPCPropertyEnum.RPC_SERVER.toString(), 
"localhost");
+        metricsEvent.put(QueryRPCPropertyEnum.EXCEPTION.toString(), 
ResourceLimitExceededException.class);
+
+        metricsEvent.put(QueryRPCPropertyEnum.CALL_TIME.toString(), 500L);
+        metricsEvent.put(QueryRPCPropertyEnum.SKIP_COUNT.toString(), 0L);
+        metricsEvent.put(QueryRPCPropertyEnum.SCAN_COUNT.toString(), 0L);
+        metricsEvent.put(QueryRPCPropertyEnum.RETURN_COUNT.toString(), 0L);
+        metricsEvent.put(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString(), 
0L);
+        metricsEvent.put(QueryRPCPropertyEnum.AGGR_COUNT.toString(), 0L);
+        return metricsEvent;
+    }
+
+    private Record mockQueryCubeEvent() {
+        RecordEvent metricsEvent = new TimedRecordEvent(
+                
KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryCube());
+        metricsEvent.put(QueryCubePropertyEnum.PROJECT.toString(), project);
+        metricsEvent.put(QueryCubePropertyEnum.CUBE.toString(), cubeName);
+        metricsEvent.put(QueryCubePropertyEnum.SEGMENT.toString(), 
"20130101000000_20131201000000");
+        metricsEvent.put(QueryCubePropertyEnum.CUBOID_SOURCE.toString(), 255L);
+        metricsEvent.put(QueryCubePropertyEnum.CUBOID_TARGET.toString(), 255L);
+        metricsEvent.put(QueryCubePropertyEnum.IF_MATCH.toString(), true);
+        metricsEvent.put(QueryCubePropertyEnum.FILTER_MASK.toString(), 0L);
+
+        metricsEvent.put(QueryCubePropertyEnum.CALL_COUNT.toString(), 1L);
+        metricsEvent.put(QueryCubePropertyEnum.TIME_SUM.toString(), 50L);
+        metricsEvent.put(QueryCubePropertyEnum.TIME_MAX.toString(), 50L);
+        metricsEvent.put(QueryCubePropertyEnum.SKIP_COUNT.toString(), 0L);
+        metricsEvent.put(QueryCubePropertyEnum.SCAN_COUNT.toString(), 10L);
+        metricsEvent.put(QueryCubePropertyEnum.RETURN_COUNT.toString(), 10L);
+        metricsEvent.put(QueryCubePropertyEnum.AGGR_FILTER_COUNT.toString(), 
0L);
+        metricsEvent.put(QueryCubePropertyEnum.AGGR_COUNT.toString(), 0L);
+        metricsEvent.put(QueryCubePropertyEnum.IF_SUCCESS.toString(), true);
+        metricsEvent.put(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString(), 1.0);
+        return metricsEvent;
+    }
+
+    private Record mockQueryEvent() {
+        RecordEvent metricsEvent = new 
TimedRecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQuery());
+        metricsEvent.put(QueryPropertyEnum.USER.toString(), "ADMIN");
+        metricsEvent.put(QueryPropertyEnum.ID_CODE.toString(), "123456");
+        metricsEvent.put(QueryPropertyEnum.TYPE.toString(), "OLAP");
+        metricsEvent.put(QueryPropertyEnum.PROJECT.toString(), project);
+        metricsEvent.put(QueryPropertyEnum.REALIZATION.toString(), cubeName);
+        metricsEvent.put(QueryPropertyEnum.REALIZATION_TYPE.toString(), 
"Cube");
+        metricsEvent.put(QueryPropertyEnum.EXCEPTION.toString(), "NULL");
+
+        metricsEvent.put(QueryPropertyEnum.TIME_COST.toString(), 55L);
+        metricsEvent.put(QueryPropertyEnum.CALCITE_RETURN_COUNT.toString(), 
10L);
+        metricsEvent.put(QueryPropertyEnum.STORAGE_RETURN_COUNT.toString(), 
10L);
+        metricsEvent.put(QueryPropertyEnum.AGGR_FILTER_COUNT.toString(), 0L);
+        return metricsEvent;
+    }
+
+    private Record mockJobEvent() {
+        RecordEvent metricsEvent = new 
TimedRecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJob());
+        metricsEvent.put(JobPropertyEnum.USER.toString(), "ADMIN");
+        metricsEvent.put(JobPropertyEnum.PROJECT.toString(), project);
+        metricsEvent.put(JobPropertyEnum.CUBE.toString(), cubeName);
+        metricsEvent.put(JobPropertyEnum.ID_CODE.toString(), "dfaf3fafaf");
+        metricsEvent.put(JobPropertyEnum.TYPE.toString(), "BUILD");
+        metricsEvent.put(JobPropertyEnum.ALGORITHM.toString(), "INMEM");
+
+        metricsEvent.put(JobPropertyEnum.SOURCE_SIZE.toString(), 1000000000L);
+        metricsEvent.put(JobPropertyEnum.CUBE_SIZE.toString(), 10000000L);
+        metricsEvent.put(JobPropertyEnum.BUILD_DURATION.toString(), 7200000L);
+        metricsEvent.put(JobPropertyEnum.WAIT_RESOURCE_TIME.toString(), 
300000L);
+        metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), 
0.0072);
+        
metricsEvent.put(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS.toString(), 
2000000L);
+        metricsEvent.put(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString(), 
500000L);
+        
metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), 
1500000L);
+        
metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), 
1000000L);
+        return metricsEvent;
+    }
+
+    private Record mockJobExceptionEvent() {
+        RecordEvent metricsEvent = new TimedRecordEvent(
+                
KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJobException());
+        metricsEvent.put(JobPropertyEnum.USER.toString(), "ADMIN");
+        metricsEvent.put(JobPropertyEnum.PROJECT.toString(), project);
+        metricsEvent.put(JobPropertyEnum.CUBE.toString(), cubeName);
+        metricsEvent.put(JobPropertyEnum.ID_CODE.toString(), "dfaf3fafaf");
+        metricsEvent.put(JobPropertyEnum.TYPE.toString(), "BUILD");
+        metricsEvent.put(JobPropertyEnum.ALGORITHM.toString(), "INMEM");
+        metricsEvent.put(JobPropertyEnum.EXCEPTION.toString(),
+                "org.apache.hadoop.security.authorize.AuthorizationException");
+        return metricsEvent;
+    }
+
+    private Record mockTestEvent() {
+        RecordEvent metricsEvent = new TimedRecordEvent("TEST");
+        return metricsEvent;
+    }
+}
diff --git 
a/metrics-reporter-prometheus/src/test/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusReservoirReporterTest.java
 
b/metrics-reporter-prometheus/src/test/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusReservoirReporterTest.java
new file mode 100644
index 0000000..da1d5f9
--- /dev/null
+++ 
b/metrics-reporter-prometheus/src/test/java/org/apache/kylin/metrics/lib/impl/prometheus/PrometheusReservoirReporterTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.prometheus;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.Record;
+import org.apache.kylin.metrics.lib.impl.InstantReservoir;
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;
+
+import com.google.common.collect.Lists;
+
+@PrepareForTest({ 
PrometheusReservoirReporter.PrometheusReservoirListener.class })
+public class PrometheusReservoirReporterTest {
+
+    @Rule
+    public PowerMockRule rule = new PowerMockRule();
+
+    private PrometheusReservoirReporter prometheusReporter;
+    private ActiveReservoir reservoir;
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty(KylinConfig.KYLIN_CONF, 
"../examples/test_case_data/localmeta");
+
+        PrometheusEventProducer producer = 
PowerMockito.mock(PrometheusEventProducer.class);
+        
PowerMockito.whenNew(PrometheusEventProducer.class).withAnyArguments().thenReturn(producer);
+
+        reservoir = new InstantReservoir();
+        reservoir.start();
+        prometheusReporter = 
PrometheusReservoirReporter.forRegistry(reservoir).build();
+    }
+
+    @After
+    public void after() throws Exception {
+        System.clearProperty(KylinConfig.KYLIN_CONF);
+    }
+
+    @Test
+    public void testUpdate() throws Exception {
+        Record record = new RecordEvent("TEST");
+        reservoir.update(record);
+        assertEquals(0, prometheusReporter.getListener().getNRecord());
+
+        prometheusReporter.start();
+        reservoir.update(record);
+        reservoir.update(record);
+        assertEquals(2, prometheusReporter.getListener().getNRecord());
+
+        prometheusReporter.stop();
+        reservoir.update(record);
+        assertEquals(2, prometheusReporter.getListener().getNRecord());
+
+        prometheusReporter.start();
+        PowerMockito.doThrow(new 
RuntimeException()).when(prometheusReporter.getListener().producer)
+                .add(Lists.newArrayList(record));
+        reservoir.update(record);
+        assertEquals(2, prometheusReporter.getListener().getNRecord());
+        assertEquals(1, prometheusReporter.getListener().getNRecordSkip());
+    }
+}
diff --git a/pom.xml b/pom.xml
index 916ad18..ed96e06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,6 +78,9 @@
     <!-- Kafka versions -->
     <kafka.version>2.1.1</kafka.version>
 
+    <!-- Prometheus versions -->
+    <prometheus.version>0.0.21</prometheus.version>
+
     <!-- Spark versions -->
     <spark.version>2.4.5</spark.version>
     <kryo.version>4.0.0</kryo.version>
@@ -273,6 +276,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.kylin</groupId>
+        <artifactId>kylin-metrics-reporter-prometheus</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.kylin</groupId>
         <artifactId>kylin-core-metadata</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -1387,6 +1395,7 @@
     <module>core-metrics</module>
     <module>metrics-reporter-hive</module>
     <module>metrics-reporter-kafka</module>
+    <module>metrics-reporter-prometheus</module>
     <module>cache</module>
     <module>cube-migration</module>
     <module>datasource-sdk</module>
diff --git a/server/src/main/resources/kylinMetrics.xml 
b/server/src/main/resources/kylinMetrics.xml
index 2b89553..5a31d3b 100644
--- a/server/src/main/resources/kylinMetrics.xml
+++ b/server/src/main/resources/kylinMetrics.xml
@@ -51,9 +51,9 @@
             <list>
                 <ref bean="hiveSink"/>
                 <map key-type="org.apache.kylin.metrics.lib.ActiveReservoir" 
value-type="java.util.List">
-                    <!--
                     <entry key-ref="instantReservoir">
                         <list>
+                            <!--
                             <bean class="org.apache.kylin.common.util.Pair">
                                 <property name="first"
                                           
value="org.apache.kylin.metrics.lib.impl.kafka.KafkaReservoirReporter"/>
@@ -63,9 +63,17 @@
                                     </props>
                                 </property>
                             </bean>
+                            -->
+                            <bean class="org.apache.kylin.common.util.Pair">
+                                <property name="first"
+                                          
value="org.apache.kylin.metrics.lib.impl.prometheus.PrometheusReservoirReporter"/>
+                                <property name="second">
+                                    <props>
+                                    </props>
+                                </property>
+                            </bean>
                         </list>
                     </entry>
-                    -->
                     <entry key-ref="blockingReservoir">
                         <list>
                             <bean class="org.apache.kylin.common.util.Pair">

Reply via email to