This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6583d2c  Refactor function metrics to use prometheus (#2914)
6583d2c is described below

commit 6583d2ceeb57bac60a92a3ab09df7f2cb384377e
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Sat Nov 3 22:14:36 2018 -0700

    Refactor function metrics to use prometheus (#2914)
    
    * Beginging to refactor function metrics to use prometheus
    
    * fix unit test
    
    * removing test code
    
    * fix test
    
    * minor refactoring
    
    * addressing comments
    
    * adding python instance
    
    * remove test code
    
    * adding prometheus client as instance dependency
    
    * fix bug
    
    * adding prometheus to license file
---
 conf/functions_worker.yml                          |   1 -
 distribution/server/src/assemble/LICENSE.bin.txt   |   2 +
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    |   1 -
 pulsar-client-cpp/python/setup.py                  |   2 +-
 pulsar-functions/instance/pom.xml                  |  20 ++
 .../pulsar/functions/instance/FunctionStats.java   | 207 +++++++----------
 .../functions/instance/JavaInstanceRunnable.java   |  89 ++++----
 .../instance/src/main/python/python_instance.py    | 248 +++++++++------------
 .../pulsar/functions/runtime/JavaInstanceMain.java |   6 +
 .../functions/worker/FunctionRuntimeManager.java   |  21 +-
 .../functions/worker/FunctionsStatsGenerator.java  |  12 +-
 .../pulsar/functions/worker/WorkerConfig.java      |   1 -
 .../pulsar/functions/worker/WorkerService.java     |   6 -
 .../functions/worker/rest/api/WorkerImpl.java      |   4 +-
 .../worker/FunctionStatsGeneratorTest.java         |  21 +-
 15 files changed, 288 insertions(+), 353 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index bd6281b..23c3bb8 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -65,7 +65,6 @@ assignmentWriteMaxRetries: 60
 instanceLivenessCheckFreqMs: 30000
 # Frequency how often worker performs compaction on function-topics
 topicCompactionFrequencySec: 1800
-metricsSamplingPeriodSec: 60
 
 
 ###############################
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index 9d4deb9..3823cc0 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -467,6 +467,8 @@ The Apache Software License, Version 2.0
     - io.dropwizard.metrics-metrics-core-3.1.0.jar
     - io.dropwizard.metrics-metrics-graphite-3.1.0.jar
     - io.dropwizard.metrics-metrics-jvm-3.1.0.jar
+  * Prometheus
+    - io.prometheus-simpleclient_httpserver-0.5.0.jar
 
 
 BSD 3-clause "New" or "Revised" License
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 494e0db..e683b6d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -357,7 +357,6 @@ public class PulsarFunctionE2ETest {
         }, 5, 200);
 
         FunctionRuntimeManager functionRuntimeManager = 
functionsWorkerService.getFunctionRuntimeManager();
-        functionRuntimeManager.updateRates();
         FunctionStatusList functionStats = 
functionRuntimeManager.getAllFunctionStatus(tenant, namespacePortion,
                 functionName, null);
 
diff --git a/pulsar-client-cpp/python/setup.py 
b/pulsar-client-cpp/python/setup.py
index 304d345..952c57b 100644
--- a/pulsar-client-cpp/python/setup.py
+++ b/pulsar-client-cpp/python/setup.py
@@ -70,6 +70,6 @@ setup(
     license="Apache License v2.0",
     url="http://pulsar.apache.org/";,
     install_requires=[
-        'grpcio', 'protobuf'
+        'grpcio', 'protobuf', "prometheus_client"
     ],
 )
diff --git a/pulsar-functions/instance/pom.xml 
b/pulsar-functions/instance/pom.xml
index 8dec8dc..f734b03 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -107,6 +107,26 @@
       <artifactId>typetools</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>io.prometheus</groupId>
+      <artifactId>simpleclient</artifactId>
+      <version>${prometheus.version}</version>
+    </dependency>
+
+    <!-- Hotspot JVM metrics-->
+    <dependency>
+      <groupId>io.prometheus</groupId>
+      <artifactId>simpleclient_hotspot</artifactId>
+      <version>${prometheus.version}</version>
+    </dependency>
+
+    <!-- Exposition HTTPServer-->
+    <dependency>
+      <groupId>io.prometheus</groupId>
+      <artifactId>simpleclient_httpserver</artifactId>
+      <version>${prometheus.version}</version>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
index f2195de..15b01f7 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
@@ -18,16 +18,15 @@
  */
 package org.apache.pulsar.functions.instance;
 
+import com.google.common.collect.EvictingQueue;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Summary;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
 /**
  * Function stats.
  */
@@ -35,132 +34,94 @@ import java.util.Map;
 @Getter
 @Setter
 public class FunctionStats {
+
+    private static final String[] metricsLabelNames = {"tenant", "namespace", 
"name", "instance_id"};
+
+    /** Declare Prometheus stats **/
+
+    final Counter statTotalProcessed;
+
+    final Counter statTotalProcessedSuccessfully;
+
+    final Counter statTotalSysExceptions;
+
+    final Counter statTotalUserExceptions;
+
+    final Summary statProcessLatency;
+
+    CollectorRegistry functionCollectorRegistry;
+
+    @Getter
+    private 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
latestUserExceptions = EvictingQueue.create(10);
+    @Getter
+    private 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
latestSystemExceptions = EvictingQueue.create(10);
+
     @Getter
     @Setter
-    class Stats {
-        private long totalProcessed;
-        private long totalSuccessfullyProcessed;
-        private long totalUserExceptions;
-        private 
List<InstanceCommunication.FunctionStatus.ExceptionInformation> 
latestUserExceptions =
-                new LinkedList<>();
-        private long totalSystemExceptions;
-        private 
List<InstanceCommunication.FunctionStatus.ExceptionInformation> 
latestSystemExceptions =
-                new LinkedList<>();
-        private Map<String, Long> totalDeserializationExceptions = new 
HashMap<>();
-        private long totalSerializationExceptions;
-        private long totalLatencyMs;
-        private long lastInvocationTime;
-
-        public void incrementProcessed(long processedAt) {
-            totalProcessed++;
-            lastInvocationTime = processedAt;
-        }
-        public void incrementSuccessfullyProcessed(long latency) {
-            totalSuccessfullyProcessed++;
-            totalLatencyMs += latency;
-        }
-        public void incrementUserExceptions(Exception ex) {
-            InstanceCommunication.FunctionStatus.ExceptionInformation info =
-                    
InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
-                    
.setExceptionString(ex.getMessage()).setMsSinceEpoch(System.currentTimeMillis()).build();
-            latestUserExceptions.add(info);
-            if (latestUserExceptions.size() > 10) {
-                latestUserExceptions.remove(0);
-            }
-            totalUserExceptions++;
-        }
-        public void incrementSystemExceptions(Exception ex) {
-            InstanceCommunication.FunctionStatus.ExceptionInformation info =
-                    
InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
-                            
.setExceptionString(ex.getMessage()).setMsSinceEpoch(System.currentTimeMillis()).build();
-            latestSystemExceptions.add(info);
-            if (latestSystemExceptions.size() > 10) {
-                latestSystemExceptions.remove(0);
-            }
-            totalSystemExceptions++;
-        }
-        public void incrementDeserializationExceptions(String topic) {
-            if (!totalDeserializationExceptions.containsKey(topic)) {
-                totalDeserializationExceptions.put(topic, 0l);
-            }
-            totalDeserializationExceptions.put(topic, 
totalDeserializationExceptions.get(topic) + 1);
-        }
-        public void incrementSerializationExceptions() { 
totalSerializationExceptions++; }
-        public void reset() {
-            totalProcessed = 0;
-            totalSuccessfullyProcessed = 0;
-            totalUserExceptions = 0;
-            totalSystemExceptions = 0;
-            totalDeserializationExceptions.clear();
-            totalSerializationExceptions = 0;
-            totalLatencyMs = 0;
-        }
-        public double computeLatency() {
-            if (totalSuccessfullyProcessed <= 0) {
-                return 0;
-            } else {
-                return totalLatencyMs / (double) totalSuccessfullyProcessed;
-            }
-        }
-        
-        public void update(Stats stats) {
-            if (stats == null) {
-                return;
-            }
-            this.totalProcessed = stats.totalProcessed;
-            this.totalSuccessfullyProcessed = stats.totalSuccessfullyProcessed;
-            this.totalUserExceptions = stats.totalUserExceptions;
-            this.latestUserExceptions.clear();
-            this.latestSystemExceptions.clear();
-            this.totalDeserializationExceptions.clear();
-            this.latestUserExceptions.addAll(stats.latestUserExceptions);
-            this.latestSystemExceptions.addAll(stats.latestSystemExceptions);
-            
this.totalDeserializationExceptions.putAll(stats.totalDeserializationExceptions);
-            this.totalSystemExceptions = stats.totalSystemExceptions;
-            this.latestSystemExceptions = stats.latestSystemExceptions;
-            this.totalSerializationExceptions = 
stats.totalSerializationExceptions;
-            this.totalLatencyMs = stats.totalLatencyMs;
-            this.lastInvocationTime = stats.lastInvocationTime;
-        }
-    }
+    private long lastInvocationTime = 0;
 
-    private Stats currentStats;
-    private Stats totalStats;
-    private Stats stats;
-    
     public FunctionStats() {
-        currentStats = new Stats();
-        stats = new Stats();
-        totalStats = new Stats();
-    }
+        // Declare function local collector registry so that it will not clash 
with other function instances'
+        // metrics collection especially in threaded mode
+        functionCollectorRegistry = new CollectorRegistry();
 
-    public void incrementProcessed(long processedAt) {
-        currentStats.incrementProcessed(processedAt);
-        totalStats.incrementProcessed(processedAt);
-    }
+        statTotalProcessed = Counter.build()
+                .name("__function_total_processed__")
+                .help("Total number of messages processed.")
+                .labelNames(metricsLabelNames)
+                .register(functionCollectorRegistry);
 
-    public void incrementSuccessfullyProcessed(long latency) {
-        currentStats.incrementSuccessfullyProcessed(latency);
-        totalStats.incrementSuccessfullyProcessed(latency);
-    }
-    public void incrementUserExceptions(Exception ex) {
-        currentStats.incrementUserExceptions(ex);
-        totalStats.incrementUserExceptions(ex);
-    }
-    public void incrementSystemExceptions(Exception ex) {
-        currentStats.incrementSystemExceptions(ex);
-        totalStats.incrementSystemExceptions(ex);
+        statTotalProcessedSuccessfully = Counter.build()
+                .name("__function_total_successfully_processed__")
+                .help("Total number of messages processed successfully.")
+                .labelNames(metricsLabelNames)
+                .register(functionCollectorRegistry);
+
+        statTotalSysExceptions = Counter.build()
+                .name("__function_total_system_exceptions__")
+                .help("Total number of system exceptions.")
+                .labelNames(metricsLabelNames)
+                .register(functionCollectorRegistry);
+
+        statTotalUserExceptions = Counter.build()
+                .name("__function_total_user_exceptions__")
+                .help("Total number of user exceptions.")
+                .labelNames(metricsLabelNames)
+                .register(functionCollectorRegistry);
+
+        statProcessLatency = Summary.build()
+                .name("__function_process_latency_ms__").help("Process latency 
in milliseconds.")
+                .quantile(0.5, 0.01)
+                .quantile(0.9, 0.01)
+                .quantile(0.99, 0.01)
+                .quantile(0.999, 0.01)
+                .labelNames(metricsLabelNames)
+                .register(functionCollectorRegistry);
     }
-    public void incrementDeserializationExceptions(String topic) {
-        currentStats.incrementDeserializationExceptions(topic);
-        totalStats.incrementDeserializationExceptions(topic);
+
+    public void addUserException(Exception ex) {
+        InstanceCommunication.FunctionStatus.ExceptionInformation info =
+                    
InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
+                    
.setExceptionString(ex.getMessage()).setMsSinceEpoch(System.currentTimeMillis()).build();
+        latestUserExceptions.add(info);
     }
-    public void incrementSerializationExceptions() {
-        currentStats.incrementSerializationExceptions();
-        totalStats.incrementSerializationExceptions();
+
+    public void addSystemException(Throwable ex) {
+        InstanceCommunication.FunctionStatus.ExceptionInformation info =
+                
InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
+                        
.setExceptionString(ex.getMessage()).setMsSinceEpoch(System.currentTimeMillis()).build();
+        latestSystemExceptions.add(info);
+
     }
-    public void resetCurrent() {
-        stats.update(currentStats);
-        currentStats.reset();
+
+    public void reset() {
+        statTotalProcessed.clear();
+        statTotalProcessedSuccessfully.clear();
+        statTotalSysExceptions.clear();
+        statTotalUserExceptions.clear();
+        statProcessLatency.clear();
+        latestUserExceptions.clear();
+        latestSystemExceptions.clear();
+        lastInvocationTime = 0;
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 9f19095..9e5ae51 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -22,6 +22,7 @@ package org.apache.pulsar.functions.instance;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import io.netty.buffer.ByteBuf;
+import io.prometheus.client.Summary;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -115,14 +116,7 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
     private Sink sink;
 
     private final SecretsProvider secretsProvider;
-
-    public static final String METRICS_TOTAL_PROCESSED = "__total_processed__";
-    public static final String METRICS_TOTAL_SUCCESS = 
"__total_successfully_processed__";
-    public static final String METRICS_TOTAL_SYS_EXCEPTION = 
"__total_system_exceptions__";
-    public static final String METRICS_TOTAL_USER_EXCEPTION = 
"__total_user_exceptions__";
-    public static final String METRICS_TOTAL_DESERIALIZATION_EXCEPTION = 
"__total_deserialization_exceptions__";
-    public static final String METRICS_TOTAL_SERIALIZATION_EXCEPTION = 
"__total_serialization_exceptions__";
-    public static final String METRICS_AVG_LATENCY = "__avg_latency_ms__";
+    private final String[] metricsLabels;
 
     public JavaInstanceRunnable(InstanceConfig instanceConfig,
                                 FunctionCacheManager fnCache,
@@ -137,6 +131,12 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.stats = new FunctionStats();
         this.secretsProvider = secretsProvider;
+        this.metricsLabels = new String[]{
+                instanceConfig.getFunctionDetails().getTenant(),
+                instanceConfig.getFunctionDetails().getNamespace(),
+                instanceConfig.getFunctionDetails().getName(),
+                String.valueOf(instanceConfig.getInstanceId())
+        };
     }
 
     /**
@@ -208,23 +208,31 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                     }
                 }
 
-                // process the message
-                long processAt = System.currentTimeMillis();
-                stats.incrementProcessed(processAt);
                 addLogTopicHandler();
                 JavaExecutionResult result;
 
+                // set last invocation time
+                stats.setLastInvocationTime(System.currentTimeMillis());
+
+                // start time for process latency stat
+                Summary.Timer requestTimer = 
stats.statProcessLatency.labels(metricsLabels).startTimer();
+
+                // process the message
                 result = javaInstance.handleMessage(currentRecord, 
currentRecord.getValue());
 
+                // register end time
+                requestTimer.observeDuration();
+                // increment total processed
+                stats.statTotalProcessed.labels(metricsLabels).inc();
+
                 removeLogTopicHandler();
 
-                long doneProcessing = System.currentTimeMillis();
                 if (log.isDebugEnabled()) {
                     log.debug("Got result: {}", result.getResult());
                 }
 
                 try {
-                    processResult(currentRecord, result, processAt, 
doneProcessing);
+                    processResult(currentRecord, result);
                 } catch (Exception e) {
                     log.warn("Failed to process result of message {}", 
currentRecord, e);
                     currentRecord.fail();
@@ -233,6 +241,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
         } catch (Throwable t) {
             log.error("[{}] Uncaught exception in Java Instance", 
functionName, t);
             deathException = t;
+            stats.statTotalSysExceptions.labels(metricsLabels).inc();
+            stats.addSystemException(t);
             return;
         } finally {
             log.info("Closing instance");
@@ -314,18 +324,14 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
     }
 
     private void processResult(Record srcRecord,
-                               JavaExecutionResult result,
-                               long startTime, long endTime) throws Exception {
+                               JavaExecutionResult result) throws Exception {
         if (result.getUserException() != null) {
             log.info("Encountered user exception when processing message {}", 
srcRecord, result.getUserException());
-            stats.incrementUserExceptions(result.getUserException());
+            stats.statTotalUserExceptions.labels(metricsLabels).inc();
+            stats.addUserException(result.getUserException() );
             srcRecord.fail();
-        } else if (result.getSystemException() != null) {
-            log.info("Encountered system exception when processing message 
{}", srcRecord, result.getSystemException());
-            stats.incrementSystemExceptions(result.getSystemException());
-            throw result.getSystemException();
         } else {
-            stats.incrementSuccessfullyProcessed(endTime - startTime);
+            stats.statTotalProcessedSuccessfully.labels(metricsLabels).inc();
             if (result.getResult() != null) {
                 sendOutputMessage(srcRecord, result.getResult());
             } else {
@@ -405,7 +411,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
 
     public InstanceCommunication.MetricsData getAndResetMetrics() {
         InstanceCommunication.MetricsData.Builder bldr = 
createMetricsDataBuilder();
-        stats.resetCurrent();
+        stats.reset();
         if (javaInstance != null) {
             InstanceCommunication.MetricsData userMetrics =  
javaInstance.getAndResetMetrics();
             if (userMetrics != null) {
@@ -427,42 +433,39 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
     }
 
     public void resetMetrics() {
-        stats.resetCurrent();
+        stats.reset();
         javaInstance.resetMetrics();
     }
 
     private Builder createMetricsDataBuilder() {
         InstanceCommunication.MetricsData.Builder bldr = 
InstanceCommunication.MetricsData.newBuilder();
-        addSystemMetrics(METRICS_TOTAL_PROCESSED, 
stats.getStats().getTotalProcessed(), bldr);
-        addSystemMetrics(METRICS_TOTAL_SUCCESS, 
stats.getStats().getTotalSuccessfullyProcessed(),
+        addSystemMetrics("__total_processed__", 
stats.statTotalProcessed.labels(metricsLabels).get(), bldr);
+        addSystemMetrics("__total_successfully_processed__", 
stats.statTotalProcessedSuccessfully.labels(metricsLabels).get(), bldr);
+        addSystemMetrics("__total_system_exceptions__",  
stats.statTotalSysExceptions.labels(metricsLabels).get(), bldr);
+        addSystemMetrics("__total_user_exceptions__", 
stats.statTotalUserExceptions.labels(metricsLabels).get(), bldr);
+        addSystemMetrics("__avg_latency_ms__",
+                stats.statProcessLatency.labels(metricsLabels).get().count <= 
0.0
+                        ? 0 : 
stats.statProcessLatency.labels(metricsLabels).get().sum / 
stats.statProcessLatency.labels(metricsLabels).get().count,
                 bldr);
-        addSystemMetrics(METRICS_TOTAL_SYS_EXCEPTION, 
stats.getStats().getTotalSystemExceptions(), bldr);
-        addSystemMetrics(METRICS_TOTAL_USER_EXCEPTION, 
stats.getStats().getTotalUserExceptions(), bldr);
-        stats.getStats().getTotalDeserializationExceptions().forEach((topic, 
count) -> {
-            addSystemMetrics(METRICS_TOTAL_DESERIALIZATION_EXCEPTION + topic, 
count, bldr);
-        });
-        addSystemMetrics(METRICS_TOTAL_SERIALIZATION_EXCEPTION,
-                stats.getStats().getTotalSerializationExceptions(), bldr);
-        addSystemMetrics(METRICS_AVG_LATENCY, 
stats.getStats().computeLatency(), bldr);
         return bldr;
     }
 
     public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
         InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = 
InstanceCommunication.FunctionStatus.newBuilder();
-        
functionStatusBuilder.setNumProcessed(stats.getTotalStats().getTotalProcessed());
-        
functionStatusBuilder.setNumSuccessfullyProcessed(stats.getTotalStats().getTotalSuccessfullyProcessed());
-        
functionStatusBuilder.setNumUserExceptions(stats.getTotalStats().getTotalUserExceptions());
-        stats.getTotalStats().getLatestUserExceptions().forEach(ex -> {
+        
functionStatusBuilder.setNumProcessed((long)stats.statTotalProcessed.labels(metricsLabels).get());
+        
functionStatusBuilder.setNumSuccessfullyProcessed((long)stats.statTotalProcessedSuccessfully.labels(metricsLabels).get());
+        
functionStatusBuilder.setNumUserExceptions((long)stats.statTotalUserExceptions.labels(metricsLabels).get());
+        stats.getLatestUserExceptions().forEach(ex -> {
             functionStatusBuilder.addLatestUserExceptions(ex);
         });
-        
functionStatusBuilder.setNumSystemExceptions(stats.getTotalStats().getTotalSystemExceptions());
-        stats.getTotalStats().getLatestSystemExceptions().forEach(ex -> {
+        functionStatusBuilder.setNumSystemExceptions((long) 
stats.statTotalSysExceptions.labels(metricsLabels).get());
+        stats.getLatestSystemExceptions().forEach(ex -> {
             functionStatusBuilder.addLatestSystemExceptions(ex);
         });
-        
functionStatusBuilder.putAllDeserializationExceptions(stats.getTotalStats().getTotalDeserializationExceptions());
-        
functionStatusBuilder.setSerializationExceptions(stats.getTotalStats().getTotalSerializationExceptions());
-        
functionStatusBuilder.setAverageLatency(stats.getTotalStats().computeLatency());
-        
functionStatusBuilder.setLastInvocationTime(stats.getTotalStats().getLastInvocationTime());
+        functionStatusBuilder.setAverageLatency(
+                stats.statProcessLatency.labels(metricsLabels).get().count == 
0.0
+                        ? 0 : 
stats.statProcessLatency.labels(metricsLabels).get().sum / 
stats.statProcessLatency.labels(metricsLabels).get().count);
+        
functionStatusBuilder.setLastInvocationTime(stats.getLastInvocationTime());
         return functionStatusBuilder;
     }
 
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index 2438338..37a0cb2 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -34,6 +34,7 @@ import threading
 from functools import partial
 from collections import namedtuple
 from threading import Timer
+from prometheus_client import Counter, Summary
 import traceback
 import sys
 import re
@@ -69,67 +70,50 @@ def base64ify(bytes_or_str):
 
 # We keep track of the following metrics
 class Stats(object):
-  def __init__(self):
-    self.reset()
+  metrics_label_names = ['tenant', 'namespace', 'name', 'instance_id']
+
+  TOTAL_PROCESSED = '__function_total_processed__'
+  TOTAL_SUCCESSFULLY_PROCESSED = '__function_total_successfully_processed__'
+  TOTAL_SYSTEM_EXCEPTIONS = '__function_total_system_exceptions__'
+  TOTAL_USER_EXCEPTIONS = '__function_total_user_exceptions__'
+  PROCESS_LATENCY_MS = '__function_process_latency_ms__'
+
+  # Declare Prometheus
+  stat_total_processed = Counter(TOTAL_PROCESSED, 'Total number of messages 
processed.', metrics_label_names)
+  stat_total_processed_successfully = Counter(TOTAL_SUCCESSFULLY_PROCESSED,
+                                              'Total number of messages 
processed successfully.', metrics_label_names)
+  stat_total_sys_exceptions = Counter(TOTAL_SYSTEM_EXCEPTIONS, 'Total number 
of system exceptions.',
+                                      metrics_label_names)
+  stat_total_user_exceptions = Counter(TOTAL_USER_EXCEPTIONS, 'Total number of 
user exceptions.',
+                                       metrics_label_names)
+
+  stats_process_latency_ms = Summary(PROCESS_LATENCY_MS, 'Process latency in 
milliseconds.', metrics_label_names)
+
+  latest_user_exception = []
+  latest_sys_exception = []
+
+  last_invocation_time = 0.0
+
+  def add_user_exception(self):
+    self.latest_sys_exception.append((traceback.format_exc(), int(time.time() 
* 1000)))
+    if len(self.latest_sys_exception) > 10:
+      self.latest_sys_exception.pop(0)
+
+  def add_sys_exception(self):
+    self.latest_sys_exception.append((traceback.format_exc(), int(time.time() 
* 1000)))
+    if len(self.latest_sys_exception) > 10:
+      self.latest_sys_exception.pop(0)
 
   def reset(self):
-    self.nprocessed = 0
-    self.nsuccessfullyprocessed = 0
-    self.nuserexceptions = 0
-    self.latestuserexceptions = []
-    self.nsystemexceptions = 0
-    self.latestsystemexceptions = []
-    self.ndeserialization_exceptions = {}
-    self.nserialization_exceptions = 0
-    self.latency = 0
-    self.lastinvocationtime = 0
-
-  def increment_deser_errors(self, topic):
-    if topic not in self.ndeserialization_exceptions:
-      self.ndeserialization_exceptions[topic] = 0
-    self.ndeserialization_exceptions[topic] += 1
-
-  def increment_successfully_processed(self, latency):
-    self.nsuccessfullyprocessed += 1
-    self.latency += latency
-
-  def increment_processed(self, processed_at):
-    self.nprocessed += 1
-    self.lastinvocationtime = processed_at
-
-  def record_user_exception(self, ex):
-    self.latestuserexceptions.append((traceback.format_exc(), int(time.time() 
* 1000)))
-    if len(self.latestuserexceptions) > 10:
-      self.latestuserexceptions.pop(0)
-    self.nuserexceptions = self.nuserexceptions + 1
-
-  def record_system_exception(self, ex):
-    self.latestsystemexceptions.append((traceback.format_exc(), 
int(time.time() * 1000)))
-    if len(self.latestsystemexceptions) > 10:
-      self.latestsystemexceptions.pop(0)
-    self.nsystemexceptions = self.nsystemexceptions + 1
-
-  def compute_latency(self):
-    if self.nsuccessfullyprocessed <= 0:
-      return 0
-    else:
-      return self.latency / self.nsuccessfullyprocessed
-
-  def update(self, object):
-    self.nprocessed = object.nprocessed
-    self.nsuccessfullyprocessed = object.nsuccessfullyprocessed
-    self.nuserexceptions = object.nuserexceptions
-    self.nsystemexceptions = object.nsystemexceptions
-    self.nserialization_exceptions = object.nserialization_exceptions
-    self.latency = object.latency
-    self.lastinvocationtime = object.lastinvocationtime
-    self.latestuserexceptions = []
-    self.latestsystemexceptions = []
-    self.ndeserialization_exceptions.clear()
-    self.latestuserexceptions.append(object.latestuserexceptions)
-    self.latestsystemexceptions.append(object.latestsystemexceptions)
-    self.ndeserialization_exceptions.update(object.ndeserialization_exceptions)
-    
+    self.latest_user_exception.clear()
+    self.latest_sys_exception.clear()
+    self.stat_total_processed._value.set(0.0)
+    self.stat_total_processed_successfully._value.set(0.0)
+    self.stat_total_user_exceptions._value.set(0.0)
+    self.stat_total_sys_exceptions._value.set(0.0)
+    self.stats_process_latency_ms._sum.set(0)
+    self.stats_process_latency_ms._count.set(0);
+    self.last_invocation_time = 0.0
 
 class PythonInstance(object):
   def __init__(self, instance_id, function_id, function_version, 
function_details, max_buffered_tuples, expected_healthcheck_interval, 
user_code, pulsar_client, secrets_provider):
@@ -146,18 +130,17 @@ class PythonInstance(object):
     self.function_class = None
     self.function_purefunction = None
     self.producer = None
-    self.exeuction_thread = None
+    self.execution_thread = None
     self.atmost_once = 
self.instance_config.function_details.processingGuarantees == 
Function_pb2.ProcessingGuarantees.Value('ATMOST_ONCE')
     self.atleast_once = 
self.instance_config.function_details.processingGuarantees == 
Function_pb2.ProcessingGuarantees.Value('ATLEAST_ONCE')
     self.auto_ack = self.instance_config.function_details.autoAck
     self.contextimpl = None
-    self.total_stats = Stats()
-    self.current_stats = Stats()
     self.stats = Stats()
     self.last_health_check_ts = time.time()
     self.timeout_ms = function_details.source.timeoutMs if 
function_details.source.timeoutMs > 0 else None
     self.expected_healthcheck_interval = expected_healthcheck_interval
     self.secrets_provider = secrets_provider
+    self.metrics_labels = [function_details.tenant, 
function_details.namespace, function_details.name, instance_id]
 
   def health_check(self):
     self.last_health_check_ts = time.time()
@@ -229,8 +212,8 @@ class PythonInstance(object):
 
     self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log, 
self.pulsar_client, self.user_code, self.consumers, self.secrets_provider)
     # Now launch a thread that does execution
-    self.exeuction_thread = threading.Thread(target=self.actual_execution)
-    self.exeuction_thread.start()
+    self.execution_thread = threading.Thread(target=self.actual_execution)
+    self.execution_thread.start()
 
     # start proccess spawner health check timer
     self.last_health_check_ts = time.time()
@@ -239,49 +222,49 @@ class PythonInstance(object):
 
   def actual_execution(self):
     Log.debug("Started Thread for executing the function")
+
     while True:
-      msg = self.queue.get(True)
-      if isinstance(msg, InternalQuitMessage):
-        break
-      user_exception = False
-      system_exception = False
-      Log.debug("Got a message from topic %s" % msg.topic)
-      input_object = None
       try:
+        msg = self.queue.get(True)
+        if isinstance(msg, InternalQuitMessage):
+          break
+        Log.debug("Got a message from topic %s" % msg.topic)
+        # deserialize message
         input_object = msg.serde.deserialize(msg.message.data())
-      except:
-        self.current_stats.increment_deser_errors(msg.topic)
-        self.total_stats.increment_deser_errors(msg.topic)
-        continue
-      self.contextimpl.set_current_message_context(msg.message.message_id(), 
msg.topic)
-      output_object = None
-      self.saved_log_handler = None
-      if self.log_topic_handler is not None:
-        self.saved_log_handler = log.remove_all_handlers()
-        log.add_handler(self.log_topic_handler)
-      start_time = time.time()
-      self.current_stats.increment_processed(int(start_time) * 1000)
-      self.total_stats.increment_processed(int(start_time) * 1000)
-      successfully_executed = False
-      try:
-        if self.function_class is not None:
-          output_object = self.function_class.process(input_object, 
self.contextimpl)
-        else:
-          output_object = self.function_purefunction.process(input_object)
-        successfully_executed = True
+        # set current message in context
+        self.contextimpl.set_current_message_context(msg.message.message_id(), 
msg.topic)
+        output_object = None
+        self.saved_log_handler = None
+        if self.log_topic_handler is not None:
+          self.saved_log_handler = log.remove_all_handlers()
+          log.add_handler(self.log_topic_handler)
+        successfully_executed = False
+        try:
+          # get user function start time for statistic calculation
+          start_time = time.time()
+          self.stats.last_invocation_time = start_time * 1000.0
+          if self.function_class is not None:
+            output_object = self.function_class.process(input_object, 
self.contextimpl)
+          else:
+            output_object = self.function_purefunction.process(input_object)
+          successfully_executed = True
+          
Stats.stats_process_latency_ms.labels(*self.metrics_labels).observe((time.time()
 - start_time) * 1000.0)
+          Stats.stat_total_processed.labels(*self.metrics_labels).inc()
+        except Exception as e:
+          Log.exception("Exception while executing user method")
+          Stats.stat_total_user_exceptions.labels(*self.metrics_labels).inc()
+          self.stats.add_user_exception()
+
+        if self.log_topic_handler is not None:
+          log.remove_all_handlers()
+          log.add_handler(self.saved_log_handler)
+        if successfully_executed:
+          self.process_result(output_object, msg)
+
       except Exception as e:
-        Log.exception("Exception while executing user method")
-        self.total_stats.record_user_exception(e)
-        self.current_stats.record_user_exception(e)
-      end_time = time.time()
-      latency = (end_time - start_time) * 1000
-      self.total_stats.increment_successfully_processed(latency)
-      self.current_stats.increment_successfully_processed(latency)
-      if self.log_topic_handler is not None:
-        log.remove_all_handlers()
-        log.add_handler(self.saved_log_handler)
-      if successfully_executed:
-        self.process_result(output_object, msg)
+        Log.error("Uncaught exception in Python instance: %s" % e);
+        Stats.stat_total_sys_exceptions.labels(*self.metrics_labels).inc()
+        self.stats.add_sys_exception()
 
   def done_producing(self, consumer, orig_message, result, sent_message):
     if result == pulsar.Result.Ok and self.auto_ack and self.atleast_once:
@@ -289,23 +272,17 @@ class PythonInstance(object):
 
   def process_result(self, output, msg):
     if output is not None:
-      output_bytes = None
       if self.output_serde is None:
         self.setup_output_serde()
       if self.producer is None:
         self.setup_producer()
-      try:
-        output_bytes = self.output_serde.serialize(output)
-      except:
-        self.current_stats.nserialization_exceptions += 1
-        self.total_stats.nserialization_exceptions += 1
+
+      # serialize function output
+      output_bytes = self.output_serde.serialize(output)
+
       if output_bytes is not None:
         props = {"__pfn_input_topic__" : str(msg.topic), 
"__pfn_input_msg_id__" : base64ify(msg.message.message_id().serialize())}
-        try:
-          self.producer.send_async(output_bytes, partial(self.done_producing, 
msg.consumer, msg.message), properties=props)
-        except Exception as e:
-          self.current_stats.record_system_exception(e)
-          self.total_stats.record_system_exception(e)
+        self.producer.send_async(output_bytes, partial(self.done_producing, 
msg.consumer, msg.message), properties=props)
     elif self.auto_ack and self.atleast_once:
       msg.consumer.acknowledge(msg.message)
 
@@ -343,25 +320,23 @@ class PythonInstance(object):
     return metrics
 
   def reset_metrics(self):
-    self.stats.update(self.current_stats)
-    self.current_stats.reset()
+    self.stats.reset()
     self.contextimpl.reset_metrics()
 
   def get_metrics(self):
     # First get any user metrics
     metrics = self.contextimpl.get_metrics()
     # Now add system metrics as well
-    self.add_system_metrics("__total_processed__", self.stats.nprocessed, 
metrics)
-    self.add_system_metrics("__total_successfully_processed__", 
self.stats.nsuccessfullyprocessed, metrics)
-    self.add_system_metrics("__total_system_exceptions__", 
self.stats.nsystemexceptions, metrics)
-    self.add_system_metrics("__total_user_exceptions__", 
self.stats.nuserexceptions, metrics)
-    for (topic, metric) in self.stats.ndeserialization_exceptions.items():
-      self.add_system_metrics("__total_deserialization_exceptions__" + topic, 
metric, metrics)
-    self.add_system_metrics("__total_serialization_exceptions__", 
self.stats.nserialization_exceptions, metrics)
-    self.add_system_metrics("__avg_latency_ms__", 
self.stats.compute_latency(), metrics)
+    self.add_system_metrics("__total_processed__", 
Stats.stat_total_processed.labels(*self.metrics_labels)._value.get(), metrics)
+    self.add_system_metrics("__total_successfully_processed__", 
Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get(),
 metrics)
+    self.add_system_metrics("__total_system_exceptions__", 
Stats.stat_total_sys_exceptions._value.labels(*self.metrics_labels).get(), 
metrics)
+    self.add_system_metrics("__total_user_exceptions__", 
Stats.stat_total_user_exceptions._value.labels(*self.metrics_labels).get(), 
metrics)
+    self.add_system_metrics("__avg_latency_ms__",
+                            0.0 if 
Stats.stats_process_latency_ms._count.labels(*self.metrics_labels).get() <= 0.0
+                            else 
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._sum.get() / 
Stats.stats_process_latency_ms._count.labels(*self.metrics_labels).get(),
+                            metrics)
     return metrics
 
-
   def add_system_metrics(self, metric_name, value, metrics):
     metrics.metrics[metric_name].count = value
     metrics.metrics[metric_name].sum = value
@@ -371,26 +346,25 @@ class PythonInstance(object):
   def get_function_status(self):
     status = InstanceCommunication_pb2.FunctionStatus()
     status.running = True
-    status.numProcessed = self.total_stats.nprocessed
-    status.numSuccessfullyProcessed = self.total_stats.nsuccessfullyprocessed
-    status.numUserExceptions = self.total_stats.nuserexceptions
+    status.numProcessed = 
long(Stats.stat_total_processed.labels(*self.metrics_labels)._value.get())
+    status.numSuccessfullyProcessed = 
long(Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get())
+    status.numUserExceptions = 
long(Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get())
     status.instanceId = self.instance_config.instance_id
-    for ex, tm in self.total_stats.latestuserexceptions:
+    for ex, tm in self.stats.latest_user_exception:
       to_add = status.latestUserExceptions.add()
       to_add.exceptionString = ex
       to_add.msSinceEpoch = tm
-    status.numSystemExceptions = self.total_stats.nsystemexceptions
-    for ex, tm in self.total_stats.latestsystemexceptions:
+    status.numSystemExceptions = 
long(Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get())
+    for ex, tm in self.stats.latest_sys_exception:
       to_add = status.latestSystemExceptions.add()
       to_add.exceptionString = ex
       to_add.msSinceEpoch = tm
-    for (topic, metric) in 
self.total_stats.ndeserialization_exceptions.items():
-      status.deserializationExceptions[topic] = metric
-    status.serializationExceptions = self.total_stats.nserialization_exceptions
-    status.averageLatency = self.total_stats.compute_latency()
-    status.lastInvocationTime = self.total_stats.lastinvocationtime
+    status.averageLatency = 0.0 \
+      if 
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get() <= 0.0 
\
+      else 
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._sum.get() / 
Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get()
+    status.lastInvocationTime = long(self.stats.last_invocation_time)
     return status
 
   def join(self):
     self.queue.put(InternalQuitMessage(True), True)
-    self.exeuction_thread.join()
+    self.execution_thread.join()
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 4c9c086..98986dd 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -29,6 +29,8 @@ import com.google.protobuf.util.JsonFormat;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.stub.StreamObserver;
+import io.prometheus.client.exporter.HTTPServer;
+import io.prometheus.client.hotspot.DefaultExports;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
@@ -192,6 +194,10 @@ public class JavaInstanceMain implements AutoCloseable {
                 }
             }
         });
+
+        // registering jvm metrics to prometheus
+        DefaultExports.initialize();
+
         log.info("Starting runtimeSpawner");
         runtimeSpawner.start();
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 67a7aad..b11a65d 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -713,26 +713,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
         return this.functionRuntimeInfoMap;
     }
-    
-    public void updateRates() {
-        if (runtimeFactory.externallyManaged()) {
-            // We don't do metrics management for externally managed functions
-            return;
-        }
-        for (Entry<String, FunctionRuntimeInfo> entry : 
this.functionRuntimeInfoMap.entrySet()) {
-            RuntimeSpawner functionRuntimeSpawner = 
entry.getValue().getRuntimeSpawner();
-            if (functionRuntimeSpawner != null) {
-                Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
-                if (functionRuntime != null) {
-                    try {
-                        functionRuntime.resetMetrics().get();
-                    } catch (Exception e) {
-                        log.error("Failed to update stats for {}-{}", 
entry.getKey(), e.getMessage());
-                    }
-                }
-            }
-        }
-    }
+
     /**
      * Private methods for internal use.  Should not be used outside of this 
class
      */
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index 80c1b77..e497ff3 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -58,9 +58,7 @@ public class FunctionsStatsGenerator {
                     Runtime functionRuntime = 
functionRuntimeSpawner.getRuntime();
                     if (functionRuntime != null) {
                         try {
-                            InstanceCommunication.MetricsData metrics = 
workerService.getWorkerConfig()
-                                    .getMetricsSamplingPeriodSec() > 0 ? 
functionRuntime.getMetrics().get()
-                                            : 
functionRuntime.getAndResetMetrics().get();
+                            InstanceCommunication.MetricsData metrics = 
functionRuntime.getMetrics().get();
                             for (Map.Entry<String, 
InstanceCommunication.MetricsData.DataDigest> metricsEntry
                                     : metrics.getMetricsMap().entrySet()) {
                                 String metricName = metricsEntry.getKey();
@@ -75,13 +73,13 @@ public class FunctionsStatsGenerator {
                                 int instanceId = 
functionRuntimeInfo.getFunctionInstance().getInstanceId();
                                 String qualifiedNamespace = 
String.format("%s/%s", tenant, namespace);
 
-                                metric(out, cluster, qualifiedNamespace, name, 
String.format("pulsar_function%scount", metricName),
+                                metric(out, cluster, qualifiedNamespace, name, 
String.format("%scount", metricName),
                                         instanceId, dataDigest.getCount());
-                                metric(out, cluster, qualifiedNamespace, name, 
String.format("pulsar_function%smax", metricName),
+                                metric(out, cluster, qualifiedNamespace, name, 
String.format("%smax", metricName),
                                         instanceId, dataDigest.getMax());
-                                metric(out, cluster, qualifiedNamespace,name, 
String.format("pulsar_function%smin", metricName),
+                                metric(out, cluster, qualifiedNamespace,name, 
String.format("%smin", metricName),
                                         instanceId, dataDigest.getMin());
-                                metric(out, cluster, qualifiedNamespace, name, 
String.format("pulsar_function%ssum", metricName),
+                                metric(out, cluster, qualifiedNamespace, name, 
String.format("%ssum", metricName),
                                         instanceId, dataDigest.getSum());
 
                             }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 5a95a00..4adf0d4 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -77,7 +77,6 @@ public class WorkerConfig implements Serializable, 
PulsarConfiguration {
     private String clientAuthenticationParameters;
     // Frequency how often worker performs compaction on function-topics
     private long topicCompactionFrequencySec = 30 * 60; // 30 minutes
-    private int metricsSamplingPeriodSec = 60;
     /***** --- TLS --- ****/
     // Enable TLS
     private boolean tlsEnabled = false;
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 39a9041..43404ce 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -199,12 +199,6 @@ public class WorkerService {
 
             this.connectorsManager = new ConnectorsManager(workerConfig);
 
-            int metricsSamplingPeriodSec = 
this.workerConfig.getMetricsSamplingPeriodSec();
-            if (metricsSamplingPeriodSec > 0) {
-                this.statsUpdater.scheduleAtFixedRate(() -> 
this.functionRuntimeManager.updateRates(),
-                        metricsSamplingPeriodSec, metricsSamplingPeriodSec, 
TimeUnit.SECONDS);
-            }
-
         } catch (Throwable t) {
             log.error("Error Starting up in worker", t);
             throw new RuntimeException(t);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index e5b67b7..88c0a17 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -168,9 +168,7 @@ public class WorkerImpl {
                 Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
                 if (functionRuntime != null) {
                     try {
-                        InstanceCommunication.MetricsData metricsData = 
workerService.getWorkerConfig()
-                                .getMetricsSamplingPeriodSec() > 0 ? 
functionRuntime.getMetrics().get()
-                                : functionRuntime.getAndResetMetrics().get();
+                        InstanceCommunication.MetricsData metricsData = 
functionRuntime.getMetrics().get();
 
                         String tenant = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
                                 .getFunctionDetails().getTenant();
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
index 68a13b4..e35aa2b 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -87,10 +87,10 @@ public class FunctionStatsGeneratorTest {
         CompletableFuture<InstanceCommunication.MetricsData> 
metricsDataCompletableFuture = new CompletableFuture<>();
         InstanceCommunication.MetricsData metricsData = 
InstanceCommunication.MetricsData.newBuilder()
                 .putMetrics(
-                        "__total_processed__",
+                        "__function_total_processed__",
                         
InstanceCommunication.MetricsData.DataDigest.newBuilder()
                                 
.setCount(100.0).setMax(200.0).setSum(300.0).setMin(0.0).build())
-                .putMetrics("__avg_latency_ms__",
+                .putMetrics("__function_process_latency_ms__",
                         
InstanceCommunication.MetricsData.DataDigest.newBuilder()
                                 
.setCount(10.0).setMax(20.0).setSum(30.0).setMin(0.0).build())
                 .build();
@@ -126,56 +126,57 @@ public class FunctionStatsGeneratorTest {
 
         Assert.assertEquals(metrics.size(), 8);
 
-        Metric m = metrics.get("pulsar_function__total_processed__count");
+        System.out.println("metrics: " + metrics);
+        Metric m = metrics.get("__function_total_processed__count");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 100.0);
 
-        m = metrics.get("pulsar_function__total_processed__max");
+        m = metrics.get("__function_total_processed__max");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 200.0);
 
-        m = metrics.get("pulsar_function__total_processed__sum");
+        m = metrics.get("__function_total_processed__sum");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 300.0);
 
-        m = metrics.get("pulsar_function__total_processed__min");
+        m = metrics.get("__function_total_processed__min");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 0.0);
 
-        m = metrics.get("pulsar_function__avg_latency_ms__count");
+        m = metrics.get("__function_process_latency_ms__count");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 10.0);
 
-        m = metrics.get("pulsar_function__avg_latency_ms__max");
+        m = metrics.get("__function_process_latency_ms__max");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 20.0);
 
-        m = metrics.get("pulsar_function__avg_latency_ms__sum");
+        m = metrics.get("__function_process_latency_ms__sum");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 30.0);
 
-        m = metrics.get("pulsar_function__avg_latency_ms__min");
+        m = metrics.get("__function_process_latency_ms__min");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");

Reply via email to