This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6583d2c Refactor function metrics to use prometheus (#2914) 6583d2c is described below commit 6583d2ceeb57bac60a92a3ab09df7f2cb384377e Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Sat Nov 3 22:14:36 2018 -0700 Refactor function metrics to use prometheus (#2914) * Beginging to refactor function metrics to use prometheus * fix unit test * removing test code * fix test * minor refactoring * addressing comments * adding python instance * remove test code * adding prometheus client as instance dependency * fix bug * adding prometheus to license file --- conf/functions_worker.yml | 1 - distribution/server/src/assemble/LICENSE.bin.txt | 2 + .../apache/pulsar/io/PulsarFunctionE2ETest.java | 1 - pulsar-client-cpp/python/setup.py | 2 +- pulsar-functions/instance/pom.xml | 20 ++ .../pulsar/functions/instance/FunctionStats.java | 207 +++++++---------- .../functions/instance/JavaInstanceRunnable.java | 89 ++++---- .../instance/src/main/python/python_instance.py | 248 +++++++++------------ .../pulsar/functions/runtime/JavaInstanceMain.java | 6 + .../functions/worker/FunctionRuntimeManager.java | 21 +- .../functions/worker/FunctionsStatsGenerator.java | 12 +- .../pulsar/functions/worker/WorkerConfig.java | 1 - .../pulsar/functions/worker/WorkerService.java | 6 - .../functions/worker/rest/api/WorkerImpl.java | 4 +- .../worker/FunctionStatsGeneratorTest.java | 21 +- 15 files changed, 288 insertions(+), 353 deletions(-) diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index bd6281b..23c3bb8 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -65,7 +65,6 @@ assignmentWriteMaxRetries: 60 instanceLivenessCheckFreqMs: 30000 # Frequency how often worker performs compaction on function-topics topicCompactionFrequencySec: 1800 -metricsSamplingPeriodSec: 60 ############################### diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 9d4deb9..3823cc0 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -467,6 +467,8 @@ The Apache Software License, Version 2.0 - io.dropwizard.metrics-metrics-core-3.1.0.jar - io.dropwizard.metrics-metrics-graphite-3.1.0.jar - io.dropwizard.metrics-metrics-jvm-3.1.0.jar + * Prometheus + - io.prometheus-simpleclient_httpserver-0.5.0.jar BSD 3-clause "New" or "Revised" License diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index 494e0db..e683b6d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -357,7 +357,6 @@ public class PulsarFunctionE2ETest { }, 5, 200); FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager(); - functionRuntimeManager.updateRates(); FunctionStatusList functionStats = functionRuntimeManager.getAllFunctionStatus(tenant, namespacePortion, functionName, null); diff --git a/pulsar-client-cpp/python/setup.py b/pulsar-client-cpp/python/setup.py index 304d345..952c57b 100644 --- a/pulsar-client-cpp/python/setup.py +++ b/pulsar-client-cpp/python/setup.py @@ -70,6 +70,6 @@ setup( license="Apache License v2.0", url="http://pulsar.apache.org/", install_requires=[ - 'grpcio', 'protobuf' + 'grpcio', 'protobuf', "prometheus_client" ], ) diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml index 8dec8dc..f734b03 100644 --- a/pulsar-functions/instance/pom.xml +++ b/pulsar-functions/instance/pom.xml @@ -107,6 +107,26 @@ <artifactId>typetools</artifactId> </dependency> + <dependency> + <groupId>io.prometheus</groupId> + <artifactId>simpleclient</artifactId> + <version>${prometheus.version}</version> + </dependency> + + <!-- Hotspot JVM metrics--> + <dependency> + <groupId>io.prometheus</groupId> + <artifactId>simpleclient_hotspot</artifactId> + <version>${prometheus.version}</version> + </dependency> + + <!-- Exposition HTTPServer--> + <dependency> + <groupId>io.prometheus</groupId> + <artifactId>simpleclient_httpserver</artifactId> + <version>${prometheus.version}</version> + </dependency> + </dependencies> <build> diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java index f2195de..15b01f7 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java @@ -18,16 +18,15 @@ */ package org.apache.pulsar.functions.instance; +import com.google.common.collect.EvictingQueue; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Counter; +import io.prometheus.client.Summary; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.proto.InstanceCommunication; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - /** * Function stats. */ @@ -35,132 +34,94 @@ import java.util.Map; @Getter @Setter public class FunctionStats { + + private static final String[] metricsLabelNames = {"tenant", "namespace", "name", "instance_id"}; + + /** Declare Prometheus stats **/ + + final Counter statTotalProcessed; + + final Counter statTotalProcessedSuccessfully; + + final Counter statTotalSysExceptions; + + final Counter statTotalUserExceptions; + + final Summary statProcessLatency; + + CollectorRegistry functionCollectorRegistry; + + @Getter + private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestUserExceptions = EvictingQueue.create(10); + @Getter + private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSystemExceptions = EvictingQueue.create(10); + @Getter @Setter - class Stats { - private long totalProcessed; - private long totalSuccessfullyProcessed; - private long totalUserExceptions; - private List<InstanceCommunication.FunctionStatus.ExceptionInformation> latestUserExceptions = - new LinkedList<>(); - private long totalSystemExceptions; - private List<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSystemExceptions = - new LinkedList<>(); - private Map<String, Long> totalDeserializationExceptions = new HashMap<>(); - private long totalSerializationExceptions; - private long totalLatencyMs; - private long lastInvocationTime; - - public void incrementProcessed(long processedAt) { - totalProcessed++; - lastInvocationTime = processedAt; - } - public void incrementSuccessfullyProcessed(long latency) { - totalSuccessfullyProcessed++; - totalLatencyMs += latency; - } - public void incrementUserExceptions(Exception ex) { - InstanceCommunication.FunctionStatus.ExceptionInformation info = - InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder() - .setExceptionString(ex.getMessage()).setMsSinceEpoch(System.currentTimeMillis()).build(); - latestUserExceptions.add(info); - if (latestUserExceptions.size() > 10) { - latestUserExceptions.remove(0); - } - totalUserExceptions++; - } - public void incrementSystemExceptions(Exception ex) { - InstanceCommunication.FunctionStatus.ExceptionInformation info = - InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder() - .setExceptionString(ex.getMessage()).setMsSinceEpoch(System.currentTimeMillis()).build(); - latestSystemExceptions.add(info); - if (latestSystemExceptions.size() > 10) { - latestSystemExceptions.remove(0); - } - totalSystemExceptions++; - } - public void incrementDeserializationExceptions(String topic) { - if (!totalDeserializationExceptions.containsKey(topic)) { - totalDeserializationExceptions.put(topic, 0l); - } - totalDeserializationExceptions.put(topic, totalDeserializationExceptions.get(topic) + 1); - } - public void incrementSerializationExceptions() { totalSerializationExceptions++; } - public void reset() { - totalProcessed = 0; - totalSuccessfullyProcessed = 0; - totalUserExceptions = 0; - totalSystemExceptions = 0; - totalDeserializationExceptions.clear(); - totalSerializationExceptions = 0; - totalLatencyMs = 0; - } - public double computeLatency() { - if (totalSuccessfullyProcessed <= 0) { - return 0; - } else { - return totalLatencyMs / (double) totalSuccessfullyProcessed; - } - } - - public void update(Stats stats) { - if (stats == null) { - return; - } - this.totalProcessed = stats.totalProcessed; - this.totalSuccessfullyProcessed = stats.totalSuccessfullyProcessed; - this.totalUserExceptions = stats.totalUserExceptions; - this.latestUserExceptions.clear(); - this.latestSystemExceptions.clear(); - this.totalDeserializationExceptions.clear(); - this.latestUserExceptions.addAll(stats.latestUserExceptions); - this.latestSystemExceptions.addAll(stats.latestSystemExceptions); - this.totalDeserializationExceptions.putAll(stats.totalDeserializationExceptions); - this.totalSystemExceptions = stats.totalSystemExceptions; - this.latestSystemExceptions = stats.latestSystemExceptions; - this.totalSerializationExceptions = stats.totalSerializationExceptions; - this.totalLatencyMs = stats.totalLatencyMs; - this.lastInvocationTime = stats.lastInvocationTime; - } - } + private long lastInvocationTime = 0; - private Stats currentStats; - private Stats totalStats; - private Stats stats; - public FunctionStats() { - currentStats = new Stats(); - stats = new Stats(); - totalStats = new Stats(); - } + // Declare function local collector registry so that it will not clash with other function instances' + // metrics collection especially in threaded mode + functionCollectorRegistry = new CollectorRegistry(); - public void incrementProcessed(long processedAt) { - currentStats.incrementProcessed(processedAt); - totalStats.incrementProcessed(processedAt); - } + statTotalProcessed = Counter.build() + .name("__function_total_processed__") + .help("Total number of messages processed.") + .labelNames(metricsLabelNames) + .register(functionCollectorRegistry); - public void incrementSuccessfullyProcessed(long latency) { - currentStats.incrementSuccessfullyProcessed(latency); - totalStats.incrementSuccessfullyProcessed(latency); - } - public void incrementUserExceptions(Exception ex) { - currentStats.incrementUserExceptions(ex); - totalStats.incrementUserExceptions(ex); - } - public void incrementSystemExceptions(Exception ex) { - currentStats.incrementSystemExceptions(ex); - totalStats.incrementSystemExceptions(ex); + statTotalProcessedSuccessfully = Counter.build() + .name("__function_total_successfully_processed__") + .help("Total number of messages processed successfully.") + .labelNames(metricsLabelNames) + .register(functionCollectorRegistry); + + statTotalSysExceptions = Counter.build() + .name("__function_total_system_exceptions__") + .help("Total number of system exceptions.") + .labelNames(metricsLabelNames) + .register(functionCollectorRegistry); + + statTotalUserExceptions = Counter.build() + .name("__function_total_user_exceptions__") + .help("Total number of user exceptions.") + .labelNames(metricsLabelNames) + .register(functionCollectorRegistry); + + statProcessLatency = Summary.build() + .name("__function_process_latency_ms__").help("Process latency in milliseconds.") + .quantile(0.5, 0.01) + .quantile(0.9, 0.01) + .quantile(0.99, 0.01) + .quantile(0.999, 0.01) + .labelNames(metricsLabelNames) + .register(functionCollectorRegistry); } - public void incrementDeserializationExceptions(String topic) { - currentStats.incrementDeserializationExceptions(topic); - totalStats.incrementDeserializationExceptions(topic); + + public void addUserException(Exception ex) { + InstanceCommunication.FunctionStatus.ExceptionInformation info = + InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder() + .setExceptionString(ex.getMessage()).setMsSinceEpoch(System.currentTimeMillis()).build(); + latestUserExceptions.add(info); } - public void incrementSerializationExceptions() { - currentStats.incrementSerializationExceptions(); - totalStats.incrementSerializationExceptions(); + + public void addSystemException(Throwable ex) { + InstanceCommunication.FunctionStatus.ExceptionInformation info = + InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder() + .setExceptionString(ex.getMessage()).setMsSinceEpoch(System.currentTimeMillis()).build(); + latestSystemExceptions.add(info); + } - public void resetCurrent() { - stats.update(currentStats); - currentStats.reset(); + + public void reset() { + statTotalProcessed.clear(); + statTotalProcessedSuccessfully.clear(); + statTotalSysExceptions.clear(); + statTotalUserExceptions.clear(); + statProcessLatency.clear(); + latestUserExceptions.clear(); + latestSystemExceptions.clear(); + lastInvocationTime = 0; } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 9f19095..9e5ae51 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -22,6 +22,7 @@ package org.apache.pulsar.functions.instance; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import io.netty.buffer.ByteBuf; +import io.prometheus.client.Summary; import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -115,14 +116,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private Sink sink; private final SecretsProvider secretsProvider; - - public static final String METRICS_TOTAL_PROCESSED = "__total_processed__"; - public static final String METRICS_TOTAL_SUCCESS = "__total_successfully_processed__"; - public static final String METRICS_TOTAL_SYS_EXCEPTION = "__total_system_exceptions__"; - public static final String METRICS_TOTAL_USER_EXCEPTION = "__total_user_exceptions__"; - public static final String METRICS_TOTAL_DESERIALIZATION_EXCEPTION = "__total_deserialization_exceptions__"; - public static final String METRICS_TOTAL_SERIALIZATION_EXCEPTION = "__total_serialization_exceptions__"; - public static final String METRICS_AVG_LATENCY = "__avg_latency_ms__"; + private final String[] metricsLabels; public JavaInstanceRunnable(InstanceConfig instanceConfig, FunctionCacheManager fnCache, @@ -137,6 +131,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { this.stateStorageServiceUrl = stateStorageServiceUrl; this.stats = new FunctionStats(); this.secretsProvider = secretsProvider; + this.metricsLabels = new String[]{ + instanceConfig.getFunctionDetails().getTenant(), + instanceConfig.getFunctionDetails().getNamespace(), + instanceConfig.getFunctionDetails().getName(), + String.valueOf(instanceConfig.getInstanceId()) + }; } /** @@ -208,23 +208,31 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } } - // process the message - long processAt = System.currentTimeMillis(); - stats.incrementProcessed(processAt); addLogTopicHandler(); JavaExecutionResult result; + // set last invocation time + stats.setLastInvocationTime(System.currentTimeMillis()); + + // start time for process latency stat + Summary.Timer requestTimer = stats.statProcessLatency.labels(metricsLabels).startTimer(); + + // process the message result = javaInstance.handleMessage(currentRecord, currentRecord.getValue()); + // register end time + requestTimer.observeDuration(); + // increment total processed + stats.statTotalProcessed.labels(metricsLabels).inc(); + removeLogTopicHandler(); - long doneProcessing = System.currentTimeMillis(); if (log.isDebugEnabled()) { log.debug("Got result: {}", result.getResult()); } try { - processResult(currentRecord, result, processAt, doneProcessing); + processResult(currentRecord, result); } catch (Exception e) { log.warn("Failed to process result of message {}", currentRecord, e); currentRecord.fail(); @@ -233,6 +241,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } catch (Throwable t) { log.error("[{}] Uncaught exception in Java Instance", functionName, t); deathException = t; + stats.statTotalSysExceptions.labels(metricsLabels).inc(); + stats.addSystemException(t); return; } finally { log.info("Closing instance"); @@ -314,18 +324,14 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } private void processResult(Record srcRecord, - JavaExecutionResult result, - long startTime, long endTime) throws Exception { + JavaExecutionResult result) throws Exception { if (result.getUserException() != null) { log.info("Encountered user exception when processing message {}", srcRecord, result.getUserException()); - stats.incrementUserExceptions(result.getUserException()); + stats.statTotalUserExceptions.labels(metricsLabels).inc(); + stats.addUserException(result.getUserException() ); srcRecord.fail(); - } else if (result.getSystemException() != null) { - log.info("Encountered system exception when processing message {}", srcRecord, result.getSystemException()); - stats.incrementSystemExceptions(result.getSystemException()); - throw result.getSystemException(); } else { - stats.incrementSuccessfullyProcessed(endTime - startTime); + stats.statTotalProcessedSuccessfully.labels(metricsLabels).inc(); if (result.getResult() != null) { sendOutputMessage(srcRecord, result.getResult()); } else { @@ -405,7 +411,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { public InstanceCommunication.MetricsData getAndResetMetrics() { InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder(); - stats.resetCurrent(); + stats.reset(); if (javaInstance != null) { InstanceCommunication.MetricsData userMetrics = javaInstance.getAndResetMetrics(); if (userMetrics != null) { @@ -427,42 +433,39 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } public void resetMetrics() { - stats.resetCurrent(); + stats.reset(); javaInstance.resetMetrics(); } private Builder createMetricsDataBuilder() { InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder(); - addSystemMetrics(METRICS_TOTAL_PROCESSED, stats.getStats().getTotalProcessed(), bldr); - addSystemMetrics(METRICS_TOTAL_SUCCESS, stats.getStats().getTotalSuccessfullyProcessed(), + addSystemMetrics("__total_processed__", stats.statTotalProcessed.labels(metricsLabels).get(), bldr); + addSystemMetrics("__total_successfully_processed__", stats.statTotalProcessedSuccessfully.labels(metricsLabels).get(), bldr); + addSystemMetrics("__total_system_exceptions__", stats.statTotalSysExceptions.labels(metricsLabels).get(), bldr); + addSystemMetrics("__total_user_exceptions__", stats.statTotalUserExceptions.labels(metricsLabels).get(), bldr); + addSystemMetrics("__avg_latency_ms__", + stats.statProcessLatency.labels(metricsLabels).get().count <= 0.0 + ? 0 : stats.statProcessLatency.labels(metricsLabels).get().sum / stats.statProcessLatency.labels(metricsLabels).get().count, bldr); - addSystemMetrics(METRICS_TOTAL_SYS_EXCEPTION, stats.getStats().getTotalSystemExceptions(), bldr); - addSystemMetrics(METRICS_TOTAL_USER_EXCEPTION, stats.getStats().getTotalUserExceptions(), bldr); - stats.getStats().getTotalDeserializationExceptions().forEach((topic, count) -> { - addSystemMetrics(METRICS_TOTAL_DESERIALIZATION_EXCEPTION + topic, count, bldr); - }); - addSystemMetrics(METRICS_TOTAL_SERIALIZATION_EXCEPTION, - stats.getStats().getTotalSerializationExceptions(), bldr); - addSystemMetrics(METRICS_AVG_LATENCY, stats.getStats().computeLatency(), bldr); return bldr; } public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() { InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder(); - functionStatusBuilder.setNumProcessed(stats.getTotalStats().getTotalProcessed()); - functionStatusBuilder.setNumSuccessfullyProcessed(stats.getTotalStats().getTotalSuccessfullyProcessed()); - functionStatusBuilder.setNumUserExceptions(stats.getTotalStats().getTotalUserExceptions()); - stats.getTotalStats().getLatestUserExceptions().forEach(ex -> { + functionStatusBuilder.setNumProcessed((long)stats.statTotalProcessed.labels(metricsLabels).get()); + functionStatusBuilder.setNumSuccessfullyProcessed((long)stats.statTotalProcessedSuccessfully.labels(metricsLabels).get()); + functionStatusBuilder.setNumUserExceptions((long)stats.statTotalUserExceptions.labels(metricsLabels).get()); + stats.getLatestUserExceptions().forEach(ex -> { functionStatusBuilder.addLatestUserExceptions(ex); }); - functionStatusBuilder.setNumSystemExceptions(stats.getTotalStats().getTotalSystemExceptions()); - stats.getTotalStats().getLatestSystemExceptions().forEach(ex -> { + functionStatusBuilder.setNumSystemExceptions((long) stats.statTotalSysExceptions.labels(metricsLabels).get()); + stats.getLatestSystemExceptions().forEach(ex -> { functionStatusBuilder.addLatestSystemExceptions(ex); }); - functionStatusBuilder.putAllDeserializationExceptions(stats.getTotalStats().getTotalDeserializationExceptions()); - functionStatusBuilder.setSerializationExceptions(stats.getTotalStats().getTotalSerializationExceptions()); - functionStatusBuilder.setAverageLatency(stats.getTotalStats().computeLatency()); - functionStatusBuilder.setLastInvocationTime(stats.getTotalStats().getLastInvocationTime()); + functionStatusBuilder.setAverageLatency( + stats.statProcessLatency.labels(metricsLabels).get().count == 0.0 + ? 0 : stats.statProcessLatency.labels(metricsLabels).get().sum / stats.statProcessLatency.labels(metricsLabels).get().count); + functionStatusBuilder.setLastInvocationTime(stats.getLastInvocationTime()); return functionStatusBuilder; } diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 2438338..37a0cb2 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -34,6 +34,7 @@ import threading from functools import partial from collections import namedtuple from threading import Timer +from prometheus_client import Counter, Summary import traceback import sys import re @@ -69,67 +70,50 @@ def base64ify(bytes_or_str): # We keep track of the following metrics class Stats(object): - def __init__(self): - self.reset() + metrics_label_names = ['tenant', 'namespace', 'name', 'instance_id'] + + TOTAL_PROCESSED = '__function_total_processed__' + TOTAL_SUCCESSFULLY_PROCESSED = '__function_total_successfully_processed__' + TOTAL_SYSTEM_EXCEPTIONS = '__function_total_system_exceptions__' + TOTAL_USER_EXCEPTIONS = '__function_total_user_exceptions__' + PROCESS_LATENCY_MS = '__function_process_latency_ms__' + + # Declare Prometheus + stat_total_processed = Counter(TOTAL_PROCESSED, 'Total number of messages processed.', metrics_label_names) + stat_total_processed_successfully = Counter(TOTAL_SUCCESSFULLY_PROCESSED, + 'Total number of messages processed successfully.', metrics_label_names) + stat_total_sys_exceptions = Counter(TOTAL_SYSTEM_EXCEPTIONS, 'Total number of system exceptions.', + metrics_label_names) + stat_total_user_exceptions = Counter(TOTAL_USER_EXCEPTIONS, 'Total number of user exceptions.', + metrics_label_names) + + stats_process_latency_ms = Summary(PROCESS_LATENCY_MS, 'Process latency in milliseconds.', metrics_label_names) + + latest_user_exception = [] + latest_sys_exception = [] + + last_invocation_time = 0.0 + + def add_user_exception(self): + self.latest_sys_exception.append((traceback.format_exc(), int(time.time() * 1000))) + if len(self.latest_sys_exception) > 10: + self.latest_sys_exception.pop(0) + + def add_sys_exception(self): + self.latest_sys_exception.append((traceback.format_exc(), int(time.time() * 1000))) + if len(self.latest_sys_exception) > 10: + self.latest_sys_exception.pop(0) def reset(self): - self.nprocessed = 0 - self.nsuccessfullyprocessed = 0 - self.nuserexceptions = 0 - self.latestuserexceptions = [] - self.nsystemexceptions = 0 - self.latestsystemexceptions = [] - self.ndeserialization_exceptions = {} - self.nserialization_exceptions = 0 - self.latency = 0 - self.lastinvocationtime = 0 - - def increment_deser_errors(self, topic): - if topic not in self.ndeserialization_exceptions: - self.ndeserialization_exceptions[topic] = 0 - self.ndeserialization_exceptions[topic] += 1 - - def increment_successfully_processed(self, latency): - self.nsuccessfullyprocessed += 1 - self.latency += latency - - def increment_processed(self, processed_at): - self.nprocessed += 1 - self.lastinvocationtime = processed_at - - def record_user_exception(self, ex): - self.latestuserexceptions.append((traceback.format_exc(), int(time.time() * 1000))) - if len(self.latestuserexceptions) > 10: - self.latestuserexceptions.pop(0) - self.nuserexceptions = self.nuserexceptions + 1 - - def record_system_exception(self, ex): - self.latestsystemexceptions.append((traceback.format_exc(), int(time.time() * 1000))) - if len(self.latestsystemexceptions) > 10: - self.latestsystemexceptions.pop(0) - self.nsystemexceptions = self.nsystemexceptions + 1 - - def compute_latency(self): - if self.nsuccessfullyprocessed <= 0: - return 0 - else: - return self.latency / self.nsuccessfullyprocessed - - def update(self, object): - self.nprocessed = object.nprocessed - self.nsuccessfullyprocessed = object.nsuccessfullyprocessed - self.nuserexceptions = object.nuserexceptions - self.nsystemexceptions = object.nsystemexceptions - self.nserialization_exceptions = object.nserialization_exceptions - self.latency = object.latency - self.lastinvocationtime = object.lastinvocationtime - self.latestuserexceptions = [] - self.latestsystemexceptions = [] - self.ndeserialization_exceptions.clear() - self.latestuserexceptions.append(object.latestuserexceptions) - self.latestsystemexceptions.append(object.latestsystemexceptions) - self.ndeserialization_exceptions.update(object.ndeserialization_exceptions) - + self.latest_user_exception.clear() + self.latest_sys_exception.clear() + self.stat_total_processed._value.set(0.0) + self.stat_total_processed_successfully._value.set(0.0) + self.stat_total_user_exceptions._value.set(0.0) + self.stat_total_sys_exceptions._value.set(0.0) + self.stats_process_latency_ms._sum.set(0) + self.stats_process_latency_ms._count.set(0); + self.last_invocation_time = 0.0 class PythonInstance(object): def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, expected_healthcheck_interval, user_code, pulsar_client, secrets_provider): @@ -146,18 +130,17 @@ class PythonInstance(object): self.function_class = None self.function_purefunction = None self.producer = None - self.exeuction_thread = None + self.execution_thread = None self.atmost_once = self.instance_config.function_details.processingGuarantees == Function_pb2.ProcessingGuarantees.Value('ATMOST_ONCE') self.atleast_once = self.instance_config.function_details.processingGuarantees == Function_pb2.ProcessingGuarantees.Value('ATLEAST_ONCE') self.auto_ack = self.instance_config.function_details.autoAck self.contextimpl = None - self.total_stats = Stats() - self.current_stats = Stats() self.stats = Stats() self.last_health_check_ts = time.time() self.timeout_ms = function_details.source.timeoutMs if function_details.source.timeoutMs > 0 else None self.expected_healthcheck_interval = expected_healthcheck_interval self.secrets_provider = secrets_provider + self.metrics_labels = [function_details.tenant, function_details.namespace, function_details.name, instance_id] def health_check(self): self.last_health_check_ts = time.time() @@ -229,8 +212,8 @@ class PythonInstance(object): self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log, self.pulsar_client, self.user_code, self.consumers, self.secrets_provider) # Now launch a thread that does execution - self.exeuction_thread = threading.Thread(target=self.actual_execution) - self.exeuction_thread.start() + self.execution_thread = threading.Thread(target=self.actual_execution) + self.execution_thread.start() # start proccess spawner health check timer self.last_health_check_ts = time.time() @@ -239,49 +222,49 @@ class PythonInstance(object): def actual_execution(self): Log.debug("Started Thread for executing the function") + while True: - msg = self.queue.get(True) - if isinstance(msg, InternalQuitMessage): - break - user_exception = False - system_exception = False - Log.debug("Got a message from topic %s" % msg.topic) - input_object = None try: + msg = self.queue.get(True) + if isinstance(msg, InternalQuitMessage): + break + Log.debug("Got a message from topic %s" % msg.topic) + # deserialize message input_object = msg.serde.deserialize(msg.message.data()) - except: - self.current_stats.increment_deser_errors(msg.topic) - self.total_stats.increment_deser_errors(msg.topic) - continue - self.contextimpl.set_current_message_context(msg.message.message_id(), msg.topic) - output_object = None - self.saved_log_handler = None - if self.log_topic_handler is not None: - self.saved_log_handler = log.remove_all_handlers() - log.add_handler(self.log_topic_handler) - start_time = time.time() - self.current_stats.increment_processed(int(start_time) * 1000) - self.total_stats.increment_processed(int(start_time) * 1000) - successfully_executed = False - try: - if self.function_class is not None: - output_object = self.function_class.process(input_object, self.contextimpl) - else: - output_object = self.function_purefunction.process(input_object) - successfully_executed = True + # set current message in context + self.contextimpl.set_current_message_context(msg.message.message_id(), msg.topic) + output_object = None + self.saved_log_handler = None + if self.log_topic_handler is not None: + self.saved_log_handler = log.remove_all_handlers() + log.add_handler(self.log_topic_handler) + successfully_executed = False + try: + # get user function start time for statistic calculation + start_time = time.time() + self.stats.last_invocation_time = start_time * 1000.0 + if self.function_class is not None: + output_object = self.function_class.process(input_object, self.contextimpl) + else: + output_object = self.function_purefunction.process(input_object) + successfully_executed = True + Stats.stats_process_latency_ms.labels(*self.metrics_labels).observe((time.time() - start_time) * 1000.0) + Stats.stat_total_processed.labels(*self.metrics_labels).inc() + except Exception as e: + Log.exception("Exception while executing user method") + Stats.stat_total_user_exceptions.labels(*self.metrics_labels).inc() + self.stats.add_user_exception() + + if self.log_topic_handler is not None: + log.remove_all_handlers() + log.add_handler(self.saved_log_handler) + if successfully_executed: + self.process_result(output_object, msg) + except Exception as e: - Log.exception("Exception while executing user method") - self.total_stats.record_user_exception(e) - self.current_stats.record_user_exception(e) - end_time = time.time() - latency = (end_time - start_time) * 1000 - self.total_stats.increment_successfully_processed(latency) - self.current_stats.increment_successfully_processed(latency) - if self.log_topic_handler is not None: - log.remove_all_handlers() - log.add_handler(self.saved_log_handler) - if successfully_executed: - self.process_result(output_object, msg) + Log.error("Uncaught exception in Python instance: %s" % e); + Stats.stat_total_sys_exceptions.labels(*self.metrics_labels).inc() + self.stats.add_sys_exception() def done_producing(self, consumer, orig_message, result, sent_message): if result == pulsar.Result.Ok and self.auto_ack and self.atleast_once: @@ -289,23 +272,17 @@ class PythonInstance(object): def process_result(self, output, msg): if output is not None: - output_bytes = None if self.output_serde is None: self.setup_output_serde() if self.producer is None: self.setup_producer() - try: - output_bytes = self.output_serde.serialize(output) - except: - self.current_stats.nserialization_exceptions += 1 - self.total_stats.nserialization_exceptions += 1 + + # serialize function output + output_bytes = self.output_serde.serialize(output) + if output_bytes is not None: props = {"__pfn_input_topic__" : str(msg.topic), "__pfn_input_msg_id__" : base64ify(msg.message.message_id().serialize())} - try: - self.producer.send_async(output_bytes, partial(self.done_producing, msg.consumer, msg.message), properties=props) - except Exception as e: - self.current_stats.record_system_exception(e) - self.total_stats.record_system_exception(e) + self.producer.send_async(output_bytes, partial(self.done_producing, msg.consumer, msg.message), properties=props) elif self.auto_ack and self.atleast_once: msg.consumer.acknowledge(msg.message) @@ -343,25 +320,23 @@ class PythonInstance(object): return metrics def reset_metrics(self): - self.stats.update(self.current_stats) - self.current_stats.reset() + self.stats.reset() self.contextimpl.reset_metrics() def get_metrics(self): # First get any user metrics metrics = self.contextimpl.get_metrics() # Now add system metrics as well - self.add_system_metrics("__total_processed__", self.stats.nprocessed, metrics) - self.add_system_metrics("__total_successfully_processed__", self.stats.nsuccessfullyprocessed, metrics) - self.add_system_metrics("__total_system_exceptions__", self.stats.nsystemexceptions, metrics) - self.add_system_metrics("__total_user_exceptions__", self.stats.nuserexceptions, metrics) - for (topic, metric) in self.stats.ndeserialization_exceptions.items(): - self.add_system_metrics("__total_deserialization_exceptions__" + topic, metric, metrics) - self.add_system_metrics("__total_serialization_exceptions__", self.stats.nserialization_exceptions, metrics) - self.add_system_metrics("__avg_latency_ms__", self.stats.compute_latency(), metrics) + self.add_system_metrics("__total_processed__", Stats.stat_total_processed.labels(*self.metrics_labels)._value.get(), metrics) + self.add_system_metrics("__total_successfully_processed__", Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get(), metrics) + self.add_system_metrics("__total_system_exceptions__", Stats.stat_total_sys_exceptions._value.labels(*self.metrics_labels).get(), metrics) + self.add_system_metrics("__total_user_exceptions__", Stats.stat_total_user_exceptions._value.labels(*self.metrics_labels).get(), metrics) + self.add_system_metrics("__avg_latency_ms__", + 0.0 if Stats.stats_process_latency_ms._count.labels(*self.metrics_labels).get() <= 0.0 + else Stats.stats_process_latency_ms.labels(*self.metrics_labels)._sum.get() / Stats.stats_process_latency_ms._count.labels(*self.metrics_labels).get(), + metrics) return metrics - def add_system_metrics(self, metric_name, value, metrics): metrics.metrics[metric_name].count = value metrics.metrics[metric_name].sum = value @@ -371,26 +346,25 @@ class PythonInstance(object): def get_function_status(self): status = InstanceCommunication_pb2.FunctionStatus() status.running = True - status.numProcessed = self.total_stats.nprocessed - status.numSuccessfullyProcessed = self.total_stats.nsuccessfullyprocessed - status.numUserExceptions = self.total_stats.nuserexceptions + status.numProcessed = long(Stats.stat_total_processed.labels(*self.metrics_labels)._value.get()) + status.numSuccessfullyProcessed = long(Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get()) + status.numUserExceptions = long(Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get()) status.instanceId = self.instance_config.instance_id - for ex, tm in self.total_stats.latestuserexceptions: + for ex, tm in self.stats.latest_user_exception: to_add = status.latestUserExceptions.add() to_add.exceptionString = ex to_add.msSinceEpoch = tm - status.numSystemExceptions = self.total_stats.nsystemexceptions - for ex, tm in self.total_stats.latestsystemexceptions: + status.numSystemExceptions = long(Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get()) + for ex, tm in self.stats.latest_sys_exception: to_add = status.latestSystemExceptions.add() to_add.exceptionString = ex to_add.msSinceEpoch = tm - for (topic, metric) in self.total_stats.ndeserialization_exceptions.items(): - status.deserializationExceptions[topic] = metric - status.serializationExceptions = self.total_stats.nserialization_exceptions - status.averageLatency = self.total_stats.compute_latency() - status.lastInvocationTime = self.total_stats.lastinvocationtime + status.averageLatency = 0.0 \ + if Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get() <= 0.0 \ + else Stats.stats_process_latency_ms.labels(*self.metrics_labels)._sum.get() / Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get() + status.lastInvocationTime = long(self.stats.last_invocation_time) return status def join(self): self.queue.put(InternalQuitMessage(True), True) - self.exeuction_thread.join() + self.execution_thread.join() diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index 4c9c086..98986dd 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -29,6 +29,8 @@ import com.google.protobuf.util.JsonFormat; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; +import io.prometheus.client.exporter.HTTPServer; +import io.prometheus.client.hotspot.DefaultExports; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.functions.instance.AuthenticationConfig; @@ -192,6 +194,10 @@ public class JavaInstanceMain implements AutoCloseable { } } }); + + // registering jvm metrics to prometheus + DefaultExports.initialize(); + log.info("Starting runtimeSpawner"); runtimeSpawner.start(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 67a7aad..b11a65d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -713,26 +713,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() { return this.functionRuntimeInfoMap; } - - public void updateRates() { - if (runtimeFactory.externallyManaged()) { - // We don't do metrics management for externally managed functions - return; - } - for (Entry<String, FunctionRuntimeInfo> entry : this.functionRuntimeInfoMap.entrySet()) { - RuntimeSpawner functionRuntimeSpawner = entry.getValue().getRuntimeSpawner(); - if (functionRuntimeSpawner != null) { - Runtime functionRuntime = functionRuntimeSpawner.getRuntime(); - if (functionRuntime != null) { - try { - functionRuntime.resetMetrics().get(); - } catch (Exception e) { - log.error("Failed to update stats for {}-{}", entry.getKey(), e.getMessage()); - } - } - } - } - } + /** * Private methods for internal use. Should not be used outside of this class */ diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java index 80c1b77..e497ff3 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java @@ -58,9 +58,7 @@ public class FunctionsStatsGenerator { Runtime functionRuntime = functionRuntimeSpawner.getRuntime(); if (functionRuntime != null) { try { - InstanceCommunication.MetricsData metrics = workerService.getWorkerConfig() - .getMetricsSamplingPeriodSec() > 0 ? functionRuntime.getMetrics().get() - : functionRuntime.getAndResetMetrics().get(); + InstanceCommunication.MetricsData metrics = functionRuntime.getMetrics().get(); for (Map.Entry<String, InstanceCommunication.MetricsData.DataDigest> metricsEntry : metrics.getMetricsMap().entrySet()) { String metricName = metricsEntry.getKey(); @@ -75,13 +73,13 @@ public class FunctionsStatsGenerator { int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); String qualifiedNamespace = String.format("%s/%s", tenant, namespace); - metric(out, cluster, qualifiedNamespace, name, String.format("pulsar_function%scount", metricName), + metric(out, cluster, qualifiedNamespace, name, String.format("%scount", metricName), instanceId, dataDigest.getCount()); - metric(out, cluster, qualifiedNamespace, name, String.format("pulsar_function%smax", metricName), + metric(out, cluster, qualifiedNamespace, name, String.format("%smax", metricName), instanceId, dataDigest.getMax()); - metric(out, cluster, qualifiedNamespace,name, String.format("pulsar_function%smin", metricName), + metric(out, cluster, qualifiedNamespace,name, String.format("%smin", metricName), instanceId, dataDigest.getMin()); - metric(out, cluster, qualifiedNamespace, name, String.format("pulsar_function%ssum", metricName), + metric(out, cluster, qualifiedNamespace, name, String.format("%ssum", metricName), instanceId, dataDigest.getSum()); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 5a95a00..4adf0d4 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -77,7 +77,6 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { private String clientAuthenticationParameters; // Frequency how often worker performs compaction on function-topics private long topicCompactionFrequencySec = 30 * 60; // 30 minutes - private int metricsSamplingPeriodSec = 60; /***** --- TLS --- ****/ // Enable TLS private boolean tlsEnabled = false; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 39a9041..43404ce 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -199,12 +199,6 @@ public class WorkerService { this.connectorsManager = new ConnectorsManager(workerConfig); - int metricsSamplingPeriodSec = this.workerConfig.getMetricsSamplingPeriodSec(); - if (metricsSamplingPeriodSec > 0) { - this.statsUpdater.scheduleAtFixedRate(() -> this.functionRuntimeManager.updateRates(), - metricsSamplingPeriodSec, metricsSamplingPeriodSec, TimeUnit.SECONDS); - } - } catch (Throwable t) { log.error("Error Starting up in worker", t); throw new RuntimeException(t); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java index e5b67b7..88c0a17 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java @@ -168,9 +168,7 @@ public class WorkerImpl { Runtime functionRuntime = functionRuntimeSpawner.getRuntime(); if (functionRuntime != null) { try { - InstanceCommunication.MetricsData metricsData = workerService.getWorkerConfig() - .getMetricsSamplingPeriodSec() > 0 ? functionRuntime.getMetrics().get() - : functionRuntime.getAndResetMetrics().get(); + InstanceCommunication.MetricsData metricsData = functionRuntime.getMetrics().get(); String tenant = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() .getFunctionDetails().getTenant(); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java index 68a13b4..e35aa2b 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java @@ -87,10 +87,10 @@ public class FunctionStatsGeneratorTest { CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture = new CompletableFuture<>(); InstanceCommunication.MetricsData metricsData = InstanceCommunication.MetricsData.newBuilder() .putMetrics( - "__total_processed__", + "__function_total_processed__", InstanceCommunication.MetricsData.DataDigest.newBuilder() .setCount(100.0).setMax(200.0).setSum(300.0).setMin(0.0).build()) - .putMetrics("__avg_latency_ms__", + .putMetrics("__function_process_latency_ms__", InstanceCommunication.MetricsData.DataDigest.newBuilder() .setCount(10.0).setMax(20.0).setSum(30.0).setMin(0.0).build()) .build(); @@ -126,56 +126,57 @@ public class FunctionStatsGeneratorTest { Assert.assertEquals(metrics.size(), 8); - Metric m = metrics.get("pulsar_function__total_processed__count"); + System.out.println("metrics: " + metrics); + Metric m = metrics.get("__function_total_processed__count"); assertEquals(m.tags.get("cluster"), "default"); assertEquals(m.tags.get("instanceId"), "0"); assertEquals(m.tags.get("name"), "func-1"); assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); assertEquals(m.value, 100.0); - m = metrics.get("pulsar_function__total_processed__max"); + m = metrics.get("__function_total_processed__max"); assertEquals(m.tags.get("cluster"), "default"); assertEquals(m.tags.get("instanceId"), "0"); assertEquals(m.tags.get("name"), "func-1"); assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); assertEquals(m.value, 200.0); - m = metrics.get("pulsar_function__total_processed__sum"); + m = metrics.get("__function_total_processed__sum"); assertEquals(m.tags.get("cluster"), "default"); assertEquals(m.tags.get("instanceId"), "0"); assertEquals(m.tags.get("name"), "func-1"); assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); assertEquals(m.value, 300.0); - m = metrics.get("pulsar_function__total_processed__min"); + m = metrics.get("__function_total_processed__min"); assertEquals(m.tags.get("cluster"), "default"); assertEquals(m.tags.get("instanceId"), "0"); assertEquals(m.tags.get("name"), "func-1"); assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); assertEquals(m.value, 0.0); - m = metrics.get("pulsar_function__avg_latency_ms__count"); + m = metrics.get("__function_process_latency_ms__count"); assertEquals(m.tags.get("cluster"), "default"); assertEquals(m.tags.get("instanceId"), "0"); assertEquals(m.tags.get("name"), "func-1"); assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); assertEquals(m.value, 10.0); - m = metrics.get("pulsar_function__avg_latency_ms__max"); + m = metrics.get("__function_process_latency_ms__max"); assertEquals(m.tags.get("cluster"), "default"); assertEquals(m.tags.get("instanceId"), "0"); assertEquals(m.tags.get("name"), "func-1"); assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); assertEquals(m.value, 20.0); - m = metrics.get("pulsar_function__avg_latency_ms__sum"); + m = metrics.get("__function_process_latency_ms__sum"); assertEquals(m.tags.get("cluster"), "default"); assertEquals(m.tags.get("instanceId"), "0"); assertEquals(m.tags.get("name"), "func-1"); assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); assertEquals(m.value, 30.0); - m = metrics.get("pulsar_function__avg_latency_ms__min"); + m = metrics.get("__function_process_latency_ms__min"); assertEquals(m.tags.get("cluster"), "default"); assertEquals(m.tags.get("instanceId"), "0"); assertEquals(m.tags.get("name"), "func-1");