This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d24102  prometheus metrics for functions served via brokers or 
function instances should match (#3066)
8d24102 is described below

commit 8d241022724fff411f6841ff84b970f01d5dbd18
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Tue Nov 27 14:49:01 2018 -0800

    prometheus metrics for functions served via brokers or function instances 
should match (#3066)
    
    * prometheus metrics for functions served via brokers or instances 
themselves should match
    
    * add additional testing
---
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    | 248 +++++++++++++++++++--
 .../functions/instance/FunctionStatsManager.java   |  15 ++
 .../functions/instance/JavaInstanceRunnable.java   |   7 +-
 .../functions/runtime/KubernetesRuntime.java       |   7 +
 .../pulsar/functions/runtime/ProcessRuntime.java   |  10 +-
 .../apache/pulsar/functions/runtime/Runtime.java   |   2 +
 .../pulsar/functions/runtime/RuntimeUtils.java     |  21 ++
 .../pulsar/functions/runtime/ThreadRuntime.java    |   6 +
 .../functions/worker/FunctionsStatsGenerator.java  |  43 +---
 .../worker/FunctionStatsGeneratorTest.java         | 220 ------------------
 10 files changed, 294 insertions(+), 285 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 9214d25..f2869e6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -18,28 +18,9 @@
  */
 package org.apache.pulsar.io;
 
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
-import static org.mockito.Mockito.spy;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.ToString;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -65,6 +46,7 @@ import 
org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import 
org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -76,6 +58,33 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Method;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
 /**
  * Test Pulsar sink on function
  *
@@ -391,6 +400,79 @@ public class PulsarFunctionE2ETest {
         
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
 functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency());
         
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
 functionStats.getAvgProcessLatency());
 
+        // validate prometheus metrics empty
+        String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
+        log.info("prometheus metrics: {}", prometheusMetrics);
+
+        Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
+        Metric m = metrics.get("pulsar_function_received_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_received_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_user_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_user_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_process_latency_ms");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, Double.NaN);
+        m = metrics.get("pulsar_function_process_latency_ms_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, Double.NaN);
+        m = metrics.get("pulsar_function_system_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_system_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_last_invocation");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_processed_successfully_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_processed_successfully_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+
+
         // validate function instance stats empty
         FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData 
functionInstanceStats = functionRuntimeManager.getFunctionInstanceStats(tenant, 
namespacePortion,
                 functionName, 0,  null);
@@ -464,6 +546,78 @@ public class PulsarFunctionE2ETest {
 
         assertEquals(functionInstanceStats, functionInstanceStatsAdmin);
         assertEquals(functionInstanceStats, 
functionStats.instances.get(0).getMetrics());
+
+        // validate prometheus metrics
+        prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
+        log.info("prometheus metrics: {}", prometheusMetrics);
+
+        metrics = parseMetrics(prometheusMetrics);
+        m = metrics.get("pulsar_function_received_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, (double) totalMsgs);
+        m = metrics.get("pulsar_function_received_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, (double) totalMsgs);
+        m = metrics.get("pulsar_function_user_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_user_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_process_latency_ms");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_function_process_latency_ms_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_function_system_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_system_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_function_last_invocation");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_function_processed_successfully_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, (double) totalMsgs);
+        m = metrics.get("pulsar_function_processed_successfully_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.value, (double) totalMsgs);
     }
 
     @Test(timeOut = 20000)
@@ -628,4 +782,58 @@ public class PulsarFunctionE2ETest {
 
         producer.close();
     }
+
+    public static String getPrometheusMetrics(int metricsPort) throws 
IOException {
+        StringBuilder result = new StringBuilder();
+        URL url = new URL(String.format("http://%s:%s/metrics";, 
InetAddress.getLocalHost().getHostAddress(), metricsPort));
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+        conn.setRequestMethod("GET");
+        BufferedReader rd = new BufferedReader(new 
InputStreamReader(conn.getInputStream()));
+        String line;
+        while ((line = rd.readLine()) != null) {
+            result.append(line + System.lineSeparator());
+        }
+        rd.close();
+        return result.toString();
+    }
+
+    /**
+     * Hacky parsing of Prometheus text format. Sould be good enough for unit 
tests
+     */
+    private static Map<String, Metric> parseMetrics(String metrics) {
+        Map<String, Metric> parsed = new HashMap<>();
+        // Example of lines are
+        // jvm_threads_current{cluster="standalone",} 203.0
+        // or
+        // pulsar_subscriptions_count{cluster="standalone", 
namespace="sample/standalone/ns1",
+        // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
+        Pattern pattern = 
Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.]+)(\\s(\\d+))?$");
+        Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
+        Arrays.asList(metrics.split("\n")).forEach(line -> {
+            if (line.isEmpty() || line.startsWith("#")) {
+                return;
+            }
+            Matcher matcher = pattern.matcher(line);
+            checkArgument(matcher.matches());
+            String name = matcher.group(1);
+            Metric m = new Metric();
+            m.value = Double.valueOf(matcher.group(3));
+            String tags = matcher.group(2);
+            Matcher tagsMatcher = tagsPattern.matcher(tags);
+            while (tagsMatcher.find()) {
+                String tag = tagsMatcher.group(1);
+                String value = tagsMatcher.group(2);
+                m.tags.put(tag, value);
+            }
+            parsed.put(name, m);
+        });
+        return parsed;
+    }
+
+    @ToString
+    static class Metric {
+        Map<String, String> tags = new TreeMap<>();
+        double value;
+    }
+
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
index 139208b..c1b7574 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
@@ -23,11 +23,14 @@ import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
 import io.prometheus.client.Summary;
+import io.prometheus.client.exporter.common.TextFormat;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 
+import java.io.IOException;
+import java.io.StringWriter;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -89,6 +92,8 @@ public class FunctionStatsManager implements AutoCloseable {
 
     private ScheduledFuture<?> scheduledFuture;
 
+    private final CollectorRegistry collectorRegistry;
+
     @Getter
     private 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
latestUserExceptions = EvictingQueue.create(10);
     @Getter
@@ -96,6 +101,8 @@ public class FunctionStatsManager implements AutoCloseable {
 
     public FunctionStatsManager(CollectorRegistry collectorRegistry, String[] 
metricsLabels, ScheduledExecutorService scheduledExecutorService) {
 
+        this.collectorRegistry = collectorRegistry;
+
         this.metricsLabels = metricsLabels;
 
         statTotalProcessedSuccessfully = Counter.build()
@@ -326,6 +333,14 @@ public class FunctionStatsManager implements AutoCloseable 
{
         latestSystemExceptions.clear();
     }
 
+    public String getStatsAsString() throws IOException {
+        StringWriter outputWriter = new StringWriter();
+
+        TextFormat.write004(outputWriter, 
collectorRegistry.metricFamilySamples());
+
+        return outputWriter.toString();
+    }
+
     @Override
     public void close() {
         scheduledFuture.cancel(false);
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 1d70356..a9e9177 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -24,7 +24,6 @@ import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import io.netty.buffer.ByteBuf;
 import io.prometheus.client.CollectorRegistry;
-import java.util.concurrent.TimeUnit;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -38,11 +37,8 @@ import 
org.apache.bookkeeper.clients.exceptions.ClientException;
 import org.apache.bookkeeper.clients.exceptions.InternalServerException;
 import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
 import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
-import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.common.util.Backoff.Jitter;
 import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
-import org.apache.bookkeeper.common.util.Backoff.Policy;
-import org.apache.bookkeeper.common.util.Retries;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.StorageType;
 import org.apache.bookkeeper.stream.proto.StreamConfiguration;
@@ -69,7 +65,6 @@ import org.apache.pulsar.functions.sink.PulsarSinkConfig;
 import org.apache.pulsar.functions.sink.PulsarSinkDisable;
 import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.source.PulsarSourceConfig;
-import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.StateUtils;
@@ -86,6 +81,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
@@ -119,6 +115,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
     private Throwable deathException;
 
     // function stats
+    @Getter
     private FunctionStatsManager stats;
 
     private Record<?> currentRecord;
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 482943b..fd6c4c4 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -47,6 +47,8 @@ import io.kubernetes.client.models.V1ServiceSpec;
 import io.kubernetes.client.models.V1StatefulSet;
 import io.kubernetes.client.models.V1StatefulSetSpec;
 import io.kubernetes.client.models.V1Toleration;
+
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -300,6 +302,11 @@ class KubernetesRuntime implements Runtime {
     }
 
     @Override
+    public String getPrometheusMetrics() throws IOException {
+        return RuntimeUtils.getPrometheusMetrics(METRICS_PORT);
+    }
+
+    @Override
     public boolean isAlive() {
         return running;
     }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index e39c7ad..4edac17 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.functions.proto.InstanceControlGrpc;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.Utils;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
 import java.util.TimerTask;
@@ -58,6 +59,7 @@ class ProcessRuntime implements Runtime {
     @Getter
     private List<String> processArgs;
     private int instancePort;
+    private int metricsPort;
     @Getter
     private Throwable deathException;
     private ManagedChannel channel;
@@ -81,6 +83,7 @@ class ProcessRuntime implements Runtime {
                    Long expectedHealthCheckInterval) throws Exception {
         this.instanceConfig = instanceConfig;
         this.instancePort = instanceConfig.getPort();
+        this.metricsPort = Utils.findAvailablePort();
         this.expectedHealthCheckInterval = expectedHealthCheckInterval;
         this.secretsProviderConfigurator = secretsProviderConfigurator;
         String logConfigFile = null;
@@ -119,7 +122,7 @@ class ProcessRuntime implements Runtime {
             false,
             null,
             null,
-                Utils.findAvailablePort());
+                this.metricsPort);
     }
 
     /**
@@ -268,6 +271,11 @@ class ProcessRuntime implements Runtime {
         return retval;
     }
 
+    @Override
+    public String getPrometheusMetrics() throws IOException {
+        return RuntimeUtils.getPrometheusMetrics(metricsPort);
+    }
+
     public CompletableFuture<InstanceCommunication.HealthCheckResult> 
healthCheck() {
         CompletableFuture<InstanceCommunication.HealthCheckResult> retval = 
new CompletableFuture<>();
         if (stub == null) {
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
index ac1eced..fafdca7 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.runtime;
 
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 
+import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -46,4 +47,5 @@ public interface Runtime {
     
     CompletableFuture<InstanceCommunication.MetricsData> getMetrics();
 
+    String getPrometheusMetrics() throws IOException;
 }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 05f21e6..a5f55c5 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -20,6 +20,13 @@
 package org.apache.pulsar.functions.runtime;
 
 import com.google.protobuf.util.JsonFormat;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.URL;
 import java.util.LinkedList;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
@@ -186,4 +193,18 @@ class RuntimeUtils {
         args.add(instanceConfig.getClusterName());
         return args;
     }
+
+    public static String getPrometheusMetrics(int metricsPort) throws 
IOException{
+        StringBuilder result = new StringBuilder();
+        URL url = new URL(String.format("http://%s:%s";, 
InetAddress.getLocalHost().getHostAddress(), metricsPort));
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+        conn.setRequestMethod("GET");
+        BufferedReader rd = new BufferedReader(new 
InputStreamReader(conn.getInputStream()));
+        String line;
+        while ((line = rd.readLine()) != null) {
+            result.append(line + System.lineSeparator());
+        }
+        rd.close();
+        return result.toString();
+    }
 }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 460cdb0..be049c3 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.functions.runtime;
 
+import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
 import io.prometheus.client.CollectorRegistry;
@@ -157,6 +158,11 @@ class ThreadRuntime implements Runtime {
     }
 
     @Override
+    public String getPrometheusMetrics() throws IOException {
+        return javaInstanceRunnable.getStats().getStatsAsString();
+    }
+
+    @Override
     public CompletableFuture<Void> resetMetrics() {
         javaInstanceRunnable.resetMetrics();
         return CompletableFuture.completedFuture(null);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index 2b8e0fd..cd186cc 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -18,17 +18,15 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import org.apache.pulsar.functions.instance.FunctionStatsManager;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
 import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 
 /**
  * A class to generate stats for pulsar functions running on this broker
@@ -57,31 +55,10 @@ public class FunctionsStatsGenerator {
                     Runtime functionRuntime = 
functionRuntimeSpawner.getRuntime();
                     if (functionRuntime != null) {
                         try {
-                            InstanceCommunication.MetricsData metrics = 
functionRuntime.getMetrics().get();
-
-                            String tenant = 
functionRuntimeInfo.getFunctionInstance()
-                                    
.getFunctionMetaData().getFunctionDetails().getTenant();
-                            String namespace = 
functionRuntimeInfo.getFunctionInstance()
-                                    
.getFunctionMetaData().getFunctionDetails().getNamespace();
-                            String name = 
functionRuntimeInfo.getFunctionInstance()
-                                    
.getFunctionMetaData().getFunctionDetails().getName();
-                            int instanceId = 
functionRuntimeInfo.getFunctionInstance().getInstanceId();
-                            String qualifiedNamespace = String.format("%s/%s", 
tenant, namespace);
-
-                            metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + 
FunctionStatsManager.PROCESS_LATENCY_MS, instanceId, 
metrics.getAvgProcessLatency());
-                            metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + 
FunctionStatsManager.LAST_INVOCATION, instanceId, metrics.getLastInvocation());
-                            metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + 
FunctionStatsManager.PROCESSED_SUCCESSFULLY_TOTAL, instanceId, 
metrics.getProcessedSuccessfullyTotal());
-                            metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + 
FunctionStatsManager.RECEIVED_TOTAL, instanceId, metrics.getReceivedTotal());
-                            metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + 
FunctionStatsManager.SYSTEM_EXCEPTIONS_TOTAL, instanceId, 
metrics.getSystemExceptionsTotal());
-                            metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + 
FunctionStatsManager.USER_EXCEPTIONS_TOTAL, instanceId, 
metrics.getUserExceptionsTotal());
 
-                            for (Map.Entry<String, Double> userMetricsMapEntry 
: metrics.getUserMetricsMap().entrySet()) {
-                                String userMetricName = 
userMetricsMapEntry.getKey();
-                                Double val = userMetricsMapEntry.getValue();
-                                metric(out, cluster, qualifiedNamespace, name, 
FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + userMetricName, 
instanceId, val);
-                            }
+                            out.write(functionRuntime.getPrometheusMetrics());
 
-                        } catch (InterruptedException | ExecutionException e) {
+                        } catch (IOException e) {
                             log.warn("Failed to collect metrics for function 
instance {}",
                                     fullyQualifiedInstanceName, e);
                         }
@@ -90,16 +67,4 @@ public class FunctionsStatsGenerator {
             }
         }
     }
-
-    private static void metricType(SimpleTextOutputStream stream, String name) 
{
-        stream.write("# TYPE ").write(name).write(" gauge\n");
-    }
-
-    private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace,
-                               String functionName, String metricName, int 
instanceId, double value) {
-        metricType(stream, metricName);
-        
stream.write(metricName).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
-                
.write("\",name=\"").write(functionName).write("\",instanceId=\"").write(instanceId).write("\"}
 ");
-        stream.write(value).write(' 
').write(System.currentTimeMillis()).write('\n');
-    }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
deleted file mode 100644
index 4b54bcf..0000000
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.worker;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.Unpooled;
-import lombok.ToString;
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
-import org.apache.pulsar.functions.runtime.Runtime;
-import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.utils.Utils;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-
-public class FunctionStatsGeneratorTest {
-
-    @Test
-    public void testGenerateFunctionStatsWhenWorkerServiceIsNotInitialized() {
-        WorkerService workerService = mock(WorkerService.class);
-        when(workerService.isInitialized()).thenReturn(false);
-        FunctionsStatsGenerator.generate(
-            workerService, "test-cluster", new 
SimpleTextOutputStream(Unpooled.buffer()));
-        verify(workerService, times(1)).isInitialized();
-        verify(workerService, times(0)).getFunctionRuntimeManager();
-    }
-
-    @Test
-    public void testGenerateFunctionStatsOnK8SRuntimeFactory() {
-        WorkerService workerService = mock(WorkerService.class);
-        when(workerService.isInitialized()).thenReturn(true);
-        FunctionRuntimeManager frm = mock(FunctionRuntimeManager.class);
-        
when(frm.getRuntimeFactory()).thenReturn(mock(KubernetesRuntimeFactory.class));
-        when(workerService.getFunctionRuntimeManager()).thenReturn(frm);
-        FunctionsStatsGenerator.generate(
-            workerService, "test-cluster", new 
SimpleTextOutputStream(Unpooled.buffer()));
-        verify(workerService, times(1)).isInitialized();
-        verify(workerService, times(1)).getFunctionRuntimeManager();
-        verify(frm, times(0)).getFunctionRuntimeInfos();
-    }
-
-    @Test
-    public void testFunctionsStatsGenerate() {
-        FunctionRuntimeManager functionRuntimeManager = 
mock(FunctionRuntimeManager.class);
-        Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new 
HashMap<>();
-
-        WorkerService workerService = mock(WorkerService.class);
-        
doReturn(functionRuntimeManager).when(workerService).getFunctionRuntimeManager();
-        doReturn(new WorkerConfig()).when(workerService).getWorkerConfig();
-        when(workerService.isInitialized()).thenReturn(true);
-
-        CompletableFuture<InstanceCommunication.MetricsData> 
metricsDataCompletableFuture = new CompletableFuture<>();
-        InstanceCommunication.MetricsData metricsData = 
InstanceCommunication.MetricsData.newBuilder()
-                .setReceivedTotal(101)
-                .setProcessedSuccessfullyTotal(99)
-                .setAvgProcessLatency(10.0)
-                .setUserExceptionsTotal(3)
-                .setSystemExceptionsTotal(1)
-                .setLastInvocation(1542324900)
-                .build();
-
-        metricsDataCompletableFuture.complete(metricsData);
-        Runtime runtime = mock(Runtime.class);
-        doReturn(metricsDataCompletableFuture).when(runtime).getMetrics();
-
-        RuntimeSpawner runtimeSpawner = mock(RuntimeSpawner.class);
-        doReturn(runtime).when(runtimeSpawner).getRuntime();
-
-        Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
-                Function.FunctionDetails.newBuilder()
-                        
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
-
-        Function.Instance instance = Function.Instance.newBuilder()
-                .setFunctionMetaData(function1).setInstanceId(0).build();
-
-        FunctionRuntimeInfo functionRuntimeInfo = 
mock(FunctionRuntimeInfo.class);
-        doReturn(runtimeSpawner).when(functionRuntimeInfo).getRuntimeSpawner();
-        doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
-
-        
functionRuntimeInfoMap.put(Utils.getFullyQualifiedInstanceId(instance), 
functionRuntimeInfo);
-        
doReturn(functionRuntimeInfoMap).when(functionRuntimeManager).getFunctionRuntimeInfos();
-
-        ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
-        SimpleTextOutputStream statsOut = new SimpleTextOutputStream(buf);
-        FunctionsStatsGenerator.generate(workerService, "default", statsOut);
-
-        String str = buf.toString(Charset.defaultCharset());
-
-        buf.release();
-        Map<String, Metric> metrics = parseMetrics(str);
-
-        Assert.assertEquals(metrics.size(), 6);
-
-        System.out.println("metrics: " + metrics);
-        Metric m = metrics.get("pulsar_function_received_total");
-        assertEquals(m.tags.get("cluster"), "default");
-        assertEquals(m.tags.get("instanceId"), "0");
-        assertEquals(m.tags.get("name"), "func-1");
-        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 101.0);
-
-        m = metrics.get("pulsar_function_user_exceptions_total");
-        assertEquals(m.tags.get("cluster"), "default");
-        assertEquals(m.tags.get("instanceId"), "0");
-        assertEquals(m.tags.get("name"), "func-1");
-        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 3.0);
-
-        m = metrics.get("pulsar_function_process_latency_ms");
-        assertEquals(m.tags.get("cluster"), "default");
-        assertEquals(m.tags.get("instanceId"), "0");
-        assertEquals(m.tags.get("name"), "func-1");
-        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 10.0);
-
-        m = metrics.get("pulsar_function_system_exceptions_total");
-        assertEquals(m.tags.get("cluster"), "default");
-        assertEquals(m.tags.get("instanceId"), "0");
-        assertEquals(m.tags.get("name"), "func-1");
-        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 1.0);
-
-        m = metrics.get("pulsar_function_last_invocation");
-        assertEquals(m.tags.get("cluster"), "default");
-        assertEquals(m.tags.get("instanceId"), "0");
-        assertEquals(m.tags.get("name"), "func-1");
-        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 1542324900.0);
-
-        m = metrics.get("pulsar_function_processed_successfully_total");
-        assertEquals(m.tags.get("cluster"), "default");
-        assertEquals(m.tags.get("instanceId"), "0");
-        assertEquals(m.tags.get("name"), "func-1");
-        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 99.0);
-    }
-
-    /**
-     * Hacky parsing of Prometheus text format. Sould be good enough for unit 
tests
-     */
-    private static Map<String, Metric> parseMetrics(String metrics) {
-        Map<String, Metric> parsed = new HashMap<>();
-
-        // Example of lines are
-        // jvm_threads_current{cluster="standalone",} 203.0
-        // or
-        // pulsar_subscriptions_count{cluster="standalone", 
namespace="sample/standalone/ns1",
-        // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
-        Pattern pattern = 
Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.]+)(\\s(\\d+))?$");
-        Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
-
-        Arrays.asList(metrics.split("\n")).forEach(line -> {
-            if (line.isEmpty() || line.startsWith("#")) {
-                return;
-            }
-            Matcher matcher = pattern.matcher(line);
-
-            checkArgument(matcher.matches());
-            String name = matcher.group(1);
-
-            Metric m = new Metric();
-            m.value = Double.valueOf(matcher.group(3));
-
-            String tags = matcher.group(2);
-            Matcher tagsMatcher = tagsPattern.matcher(tags);
-            while (tagsMatcher.find()) {
-                String tag = tagsMatcher.group(1);
-                String value = tagsMatcher.group(2);
-                m.tags.put(tag, value);
-            }
-
-            parsed.put(name, m);
-        });
-
-        return parsed;
-    }
-
-    @ToString
-    static class Metric {
-        Map<String, String> tags = new TreeMap<>();
-        double value;
-    }
-
-}

Reply via email to