This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 859c914   adding windowed metrics for functions (#3021)
859c914 is described below

commit 859c914a7a3f3ddb9f89c62eba3c048039bd852a
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Wed Nov 21 11:08:09 2018 -0800

     adding windowed metrics for functions (#3021)
    
    * adding windowed metrics for functions
    
    * adding license headers and cleaning up
    
    * remove unnecessary import
    
    * add RestException
    
    * fixing bugs and refactoring code
    
    * fix bug in instanceCache
    
    * fix bug
    
    * add test for stats and fix minor bug
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |   4 +-
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    |  97 ++++++++-
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |   5 +-
 .../pulsar/common/policies/data/FunctionStats.java |  30 ++-
 .../functions/instance/FunctionStatsManager.java   | 239 +++++++++++++++++++--
 .../pulsar/functions/instance/InstanceCache.java   |  51 +++++
 .../functions/instance/JavaInstanceRunnable.java   |  89 +++++---
 .../src/main/python/InstanceCommunication_pb2.py   |  80 +++++--
 .../instance/src/main/python/function_stats.py     | 132 +++++++++++-
 .../instance/src/main/python/python_instance.py    |  83 ++++---
 pulsar-functions/instance/src/main/python/util.py  |  19 ++
 .../src/main/proto/InstanceCommunication.proto     |  12 ++
 .../pulsar/functions/runtime/RuntimeSpawner.java   |   2 +-
 .../pulsar/functions/runtime/ThreadRuntime.java    |  46 ++--
 .../functions/runtime/ThreadRuntimeFactory.java    |   4 +
 .../org/apache/pulsar/functions/utils/Utils.java   |  13 ++
 .../functions/worker/FunctionRuntimeManager.java   |  27 ++-
 .../pulsar/functions/worker/MembershipManager.java |   4 +-
 .../pulsar/functions/worker/SchedulerManager.java  |   5 +-
 .../org/apache/pulsar/functions/worker/Utils.java  |  20 +-
 .../functions/worker/rest/RestException.java       |  78 +++++++
 .../functions/worker/rest/api/FunctionsImpl.java   |  46 ++--
 .../worker/rest/api/v2/FunctionApiV2Resource.java  |  12 +-
 .../worker/FunctionRuntimeManagerTest.java         |   7 +-
 .../worker/FunctionStatsGeneratorTest.java         |   1 +
 .../functions/worker/SchedulerManagerTest.java     |  33 +--
 .../pulsar/sql/presto/PulsarConnectorCache.java    |  10 +-
 27 files changed, 925 insertions(+), 224 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 6f7e43e..84cf8e0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -190,7 +190,7 @@ public class FunctionsBase extends AdminResource implements 
Supplier<WorkerServi
             @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
     })
     @Path("/{tenant}/{namespace}/{functionName}/stats")
-    public Response getFunctionStats(final @PathParam("tenant") String tenant,
+    public FunctionStats getFunctionStats(final @PathParam("tenant") String 
tenant,
                                      final @PathParam("namespace") String 
namespace,
                                      final @PathParam("functionName") String 
functionName) throws IOException {
         return functions.getFunctionStats(tenant, namespace, functionName, 
FunctionsImpl.FUNCTION, uri.getRequestUri());
@@ -206,7 +206,7 @@ public class FunctionsBase extends AdminResource implements 
Supplier<WorkerServi
             @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
     })
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
-    public Response getFunctionInstanceStats(final @PathParam("tenant") String 
tenant,
+    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData 
getFunctionInstanceStats(final @PathParam("tenant") String tenant,
                                              final @PathParam("namespace") 
String namespace,
                                              final @PathParam("functionName") 
String functionName,
                                              final @PathParam("instanceId") 
String instanceId) throws IOException {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index e683b6d..4dbdf3b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -32,6 +32,7 @@ import java.net.URL;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -45,6 +46,7 @@ import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -357,13 +359,102 @@ public class PulsarFunctionE2ETest {
         }, 5, 200);
 
         FunctionRuntimeManager functionRuntimeManager = 
functionsWorkerService.getFunctionRuntimeManager();
-        FunctionStatusList functionStats = 
functionRuntimeManager.getAllFunctionStatus(tenant, namespacePortion,
+        FunctionStats functionStats = 
functionRuntimeManager.getFunctionStats(tenant, namespacePortion,
+                functionName, null);
+        
+        FunctionStats functionStatsFromAdmin = 
admin.functions().getFunctionStats(tenant, namespacePortion,
+                functionName);
+
+        assertEquals(functionStats, functionStatsFromAdmin);
+
+        assertEquals(functionStats.getReceivedTotal(), totalMsgs);
+        assertEquals(functionStats.getProcessedSuccessfullyTotal(), totalMsgs);
+        assertEquals(functionStats.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.getUserExceptionsTotal(), 0);
+        assertTrue(functionStats.avgProcessLatency > 0);
+        assertEquals(functionStats.oneMin.getReceivedTotal(), totalMsgs);
+        assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), 
totalMsgs);
+        assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
+        assertTrue(functionStats.oneMin.getAvgProcessLatency() > 0);
+        assertEquals(functionStats.getAvgProcessLatency(), 
functionStats.oneMin.getAvgProcessLatency());
+        assertTrue(functionStats.getLastInvocation() > 0);
+
+        assertEquals(functionStats.instances.size(), 1);
+        assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), 
totalMsgs);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(),
 totalMsgs);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(),
 0);
+        
assertTrue(functionStats.instances.get(0).getMetrics().avgProcessLatency > 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getReceivedTotal(),
 totalMsgs);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getProcessedSuccessfullyTotal(),
 totalMsgs);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getSystemExceptionsTotal(),
 0);
+        
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getUserExceptionsTotal(),
 0);
+        
assertTrue(functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency()
 > 0);
+
+        
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
 functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency());
+        
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
 functionStats.getAvgProcessLatency());
+    }
+
+    @Test(timeOut = 20000)
+    public void testPulsarFunctionStatus() throws Exception {
+
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sourceTopic = "persistent://" + replNamespace + 
"/my-topic1";
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String propertyKey = "key";
+        final String propertyValue = "value";
+        final String functionName = "PulsarSink-test";
+        final String subscriptionName = "test-sub";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
+
+        // create a producer that creates a topic at broker
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+
+        String jarFilePathUrl = Utils.FILE + ":" + 
getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+        FunctionConfig functionConfig = createFunctionConfig(tenant, 
namespacePortion, functionName,
+                "my.*", sinkTopic, subscriptionName);
+        admin.functions().createFunctionWithUrl(functionConfig, 
jarFilePathUrl);
+
+        // try to update function to test: update-function functionality
+        admin.functions().updateFunctionWithUrl(functionConfig, 
jarFilePathUrl);
+
+        retryStrategically((test) -> {
+            try {
+                return 
admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+        // validate pulsar sink consumer has started on the topic
+        
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+        int totalMsgs = 10;
+        for (int i = 0; i < totalMsgs; i++) {
+            String data = "my-message-" + i;
+            producer.newMessage().property(propertyKey, 
propertyValue).value(data).send();
+        }
+        retryStrategically((test) -> {
+            try {
+                SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+                return subStats.unackedMessages == 0 && 
subStats.msgThroughputOut == totalMsgs;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 200);
+
+        FunctionRuntimeManager functionRuntimeManager = 
functionsWorkerService.getFunctionRuntimeManager();
+        FunctionStatusList functionStatus = 
functionRuntimeManager.getAllFunctionStatus(tenant, namespacePortion,
                 functionName, null);
 
-        int numInstances = functionStats.getFunctionStatusListCount();
+        int numInstances = functionStatus.getFunctionStatusListCount();
         assertEquals(numInstances, 1);
 
-        FunctionStatus stats = 
functionStats.getFunctionStatusListList().get(0);
+        FunctionStatus stats = 
functionStatus.getFunctionStatusListList().get(0);
 
         double count = stats.getNumProcessed();
         double success = stats.getNumSuccessfullyProcessed();
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 3a0d1e1..bfadbe4 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -659,11 +659,10 @@ public class CmdFunctions extends CmdBase {
         @Override
         void runCmd() throws Exception {
 
-            Gson gson = new GsonBuilder().setPrettyPrinting().create();
             if (isBlank(instanceId)) {
-                
System.out.println(gson.toJson(admin.functions().getFunctionStats(tenant, 
namespace, functionName)));
+                print(admin.functions().getFunctionStats(tenant, namespace, 
functionName));
             } else {
-                
System.out.println(gson.toJson(admin.functions().getFunctionStats(tenant, 
namespace, functionName, Integer.parseInt(instanceId))));
+               print(admin.functions().getFunctionStats(tenant, namespace, 
functionName, Integer.parseInt(instanceId)));
             }
         }
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
index 99e52e5..ba274c1 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.common.policies.data;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 import lombok.Data;
 
 import java.util.HashMap;
@@ -27,6 +29,7 @@ import java.util.Map;
 import java.util.function.Consumer;
 
 @Data
+@JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", 
"systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency", "1min", 
"lastInvocation", "instances" })
 public class FunctionStats {
 
     /**
@@ -54,20 +57,24 @@ public class FunctionStats {
      **/
     public double avgProcessLatency;
 
+    @JsonProperty("1min")
+    public FunctionInstanceStats.FunctionInstanceStatsDataBase oneMin = new 
FunctionInstanceStats.FunctionInstanceStatsDataBase();
+
     /**
      * Timestamp of when the function was last invoked by any instance
      **/
     public long lastInvocation;
 
     @Data
+    @JsonPropertyOrder({ "instanceId", "metrics" })
     public static class FunctionInstanceStats {
 
         /** Instance Id of function instance **/
         public int instanceId;
 
         @Data
-        public static class FunctionInstanceStatsData {
-
+        @JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", 
"systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency" })
+        public static class FunctionInstanceStatsDataBase {
             /**
              * Total number of records function received from source for 
instance
              **/
@@ -92,6 +99,14 @@ public class FunctionStats {
              * Average process latency for function for instance
              **/
             public double avgProcessLatency;
+        }
+
+        @Data
+        @JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", 
"systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency", "1min", 
"lastInvocation", "userMetrics" })
+        public static class FunctionInstanceStatsData extends 
FunctionInstanceStatsDataBase {
+
+            @JsonProperty("1min")
+            public FunctionInstanceStatsDataBase oneMin = new 
FunctionInstanceStatsDataBase();
 
             /**
              * Timestamp of when the function was last invoked for instance
@@ -125,6 +140,13 @@ public class FunctionStats {
                 systemExceptionsTotal += 
functionInstanceStatsData.systemExceptionsTotal;
                 userExceptionsTotal += 
functionInstanceStatsData.userExceptionsTotal;
                 avgProcessLatency += 
functionInstanceStatsData.avgProcessLatency;
+
+                oneMin.receivedTotal += 
functionInstanceStatsData.oneMin.receivedTotal;
+                oneMin.processedSuccessfullyTotal += 
functionInstanceStatsData.oneMin.processedSuccessfullyTotal;
+                oneMin.systemExceptionsTotal += 
functionInstanceStatsData.oneMin.systemExceptionsTotal;
+                oneMin.userExceptionsTotal += 
functionInstanceStatsData.oneMin.userExceptionsTotal;
+                oneMin.avgProcessLatency += 
functionInstanceStatsData.oneMin.avgProcessLatency;
+
                 if (functionInstanceStatsData.lastInvocation > lastInvocation) 
{
                     lastInvocation = functionInstanceStatsData.lastInvocation;
                 }
@@ -133,6 +155,10 @@ public class FunctionStats {
         });
         // calculate average from sum
         avgProcessLatency = avgProcessLatency / instances.size();
+
+        // calculate 1min average from sum
+        oneMin.avgProcessLatency = oneMin.avgProcessLatency / instances.size();
+
         return this;
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
index 388efb1..4d66422 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
@@ -28,13 +28,17 @@ import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Function stats.
  */
 @Slf4j
 @Getter
 @Setter
-public class FunctionStatsManager {
+public class FunctionStatsManager implements AutoCloseable {
 
     static final String[] metricsLabelNames = {"tenant", "namespace", 
"function", "instance_id", "cluster"};
 
@@ -50,6 +54,13 @@ public class FunctionStatsManager {
     public static final String LAST_INVOCATION = "last_invocation";
     public static final String RECEIVED_TOTAL = "received_total";
 
+    public static final String PROCESSED_TOTAL_1min = "processed_total_1min";
+    public static final String PROCESSED_SUCCESSFULLY_TOTAL_1min = 
"processed_successfully_total_1min";
+    public static final String SYSTEM_EXCEPTIONS_TOTAL_1min = 
"system_exceptions_total_1min";
+    public static final String USER_EXCEPTIONS_TOTAL_1min = 
"user_exceptions_total_1min";
+    public static final String PROCESS_LATENCY_MS_1min = 
"process_latency_ms_1min";
+    public static final String RECEIVED_TOTAL_1min = "received_total_1min";
+
     /** Declare Prometheus stats **/
 
     final Counter statTotalProcessed;
@@ -65,18 +76,33 @@ public class FunctionStatsManager {
     final Gauge statlastInvocation;
 
     final Counter statTotalRecordsRecieved;
+    
+    // windowed metrics
+
+    final Counter statTotalProcessed1min;
+
+    final Counter statTotalProcessedSuccessfully1min;
+
+    final Counter statTotalSysExceptions1min;
+
+    final Counter statTotalUserExceptions1min;
 
-    CollectorRegistry functionCollectorRegistry;
+    final Summary statProcessLatency1min;
+
+    final Counter statTotalRecordsRecieved1min;
+
+    private String[] metricsLabels;
+
+    private ScheduledFuture<?> scheduledFuture;
 
     @Getter
     private 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
latestUserExceptions = EvictingQueue.create(10);
     @Getter
     private 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
latestSystemExceptions = EvictingQueue.create(10);
 
-    public FunctionStatsManager(CollectorRegistry collectorRegistry) {
-        // Declare function local collector registry so that it will not clash 
with other function instances'
-        // metrics collection especially in threaded mode
-        functionCollectorRegistry = new CollectorRegistry();
+    public FunctionStatsManager(CollectorRegistry collectorRegistry, String[] 
metricsLabels, ScheduledExecutorService scheduledExecutorService) {
+
+        this.metricsLabels = metricsLabels;
 
         statTotalProcessed = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_TOTAL)
@@ -114,7 +140,7 @@ public class FunctionStatsManager {
 
         statlastInvocation = Gauge.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION)
-                .help("The timestamp of the last invocation of the function")
+                .help("The timestamp of the last invocation of the function.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
 
@@ -123,6 +149,57 @@ public class FunctionStatsManager {
                 .help("Total number of messages received from source.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
+
+        statTotalProcessed1min = Counter.build()
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_TOTAL_1min)
+                .help("Total number of messages processed in the last 1 
minute.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+
+        statTotalProcessedSuccessfully1min = Counter.build()
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + 
PROCESSED_SUCCESSFULLY_TOTAL_1min)
+                .help("Total number of messages processed successfully in the 
last 1 minute.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+
+        statTotalSysExceptions1min = Counter.build()
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + 
SYSTEM_EXCEPTIONS_TOTAL_1min)
+                .help("Total number of system exceptions in the last 1 
minute.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+
+        statTotalUserExceptions1min = Counter.build()
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + 
USER_EXCEPTIONS_TOTAL_1min)
+                .help("Total number of user exceptions in the last 1 minute.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+
+        statProcessLatency1min = Summary.build()
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min)
+                .help("Process latency in milliseconds in the last 1 minute.")
+                .quantile(0.5, 0.01)
+                .quantile(0.9, 0.01)
+                .quantile(0.99, 0.01)
+                .quantile(0.999, 0.01)
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+
+        statTotalRecordsRecieved1min = Counter.build()
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min)
+                .help("Total number of messages received from source in the 
last 1 minute.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+
+        scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new 
Runnable() {
+            @Override
+            public void run() {
+                try {
+                    reset();
+                } catch (Exception e) {
+                    log.error("Failed to reset metrics for 1min window", e);
+                }
+            }
+        }, 1, 1, TimeUnit.MINUTES);
     }
 
     public void addUserException(Exception ex) {
@@ -140,15 +217,149 @@ public class FunctionStatsManager {
 
     }
 
+    public void incrTotalReceived() {
+        statTotalRecordsRecieved.labels(metricsLabels).inc();
+        statTotalRecordsRecieved1min.labels(metricsLabels).inc();
+    }
+
+    public void incrTotalProcessed() {
+        statTotalProcessed.labels(metricsLabels).inc();
+        statTotalProcessed1min.labels(metricsLabels).inc();
+    }
+
+    public void incrTotalProcessedSuccessfully() {
+        statTotalProcessedSuccessfully.labels(metricsLabels).inc();
+        statTotalProcessedSuccessfully1min.labels(metricsLabels).inc();
+    }
+
+    public void incrSysExceptions(Throwable sysException) {
+        statTotalSysExceptions.labels(metricsLabels).inc();
+        statTotalSysExceptions1min.labels(metricsLabels).inc();
+        addSystemException(sysException);
+    }
+
+    public void incrUserExceptions(Exception userException) {
+        statTotalUserExceptions.labels(metricsLabels).inc();
+        statTotalUserExceptions1min.labels(metricsLabels).inc();
+        addUserException(userException);
+    }
+
+    public void setLastInvocation(long ts) {
+        statlastInvocation.labels(metricsLabels).set(ts);
+    }
+
+    private Long processTimeStart;
+    public void processTimeStart() {
+        processTimeStart = System.nanoTime();
+    }
+
+    public void processTimeEnd() {
+        if (processTimeStart != null) {
+            double endTimeMs = ((double) System.nanoTime() - processTimeStart) 
/ 1.0E6D;
+            statProcessLatency.labels(metricsLabels).observe(endTimeMs);
+            statProcessLatency1min.labels(metricsLabels).observe(endTimeMs);
+        }
+    }
+
+    public double getTotalProcessed() {
+        return statTotalProcessed.labels(metricsLabels).get();
+    }
+
+    public double getTotalProcessedSuccessfully() {
+        return statTotalProcessedSuccessfully.labels(metricsLabels).get();
+    }
+
+    public double getTotalRecordsReceived() {
+        return statTotalRecordsRecieved.labels(metricsLabels).get();
+    }
+
+    public double getTotalSysExceptions() {
+        return statTotalSysExceptions.labels(metricsLabels).get();
+    }
+
+    public double getTotalUserExceptions() {
+        return statTotalUserExceptions.labels(metricsLabels).get();
+    }
+
+    public double getLastInvocation() {
+        return statlastInvocation.labels(metricsLabels).get();
+    }
+
+    public double getAvgProcessLatency() {
+        return statProcessLatency.labels(metricsLabels).get().count <= 0.0
+                ? 0 : statProcessLatency.labels(metricsLabels).get().sum / 
statProcessLatency.labels(metricsLabels).get().count;
+    }
+
+    public double getProcessLatency50P() {
+        return 
statProcessLatency.labels(metricsLabels).get().quantiles.get(0.5);
+    }
+
+    public double getProcessLatency90P() {
+        return 
statProcessLatency.labels(metricsLabels).get().quantiles.get(0.9);
+    }
+
+    public double getProcessLatency99P() {
+        return 
statProcessLatency.labels(metricsLabels).get().quantiles.get(0.99);
+    }
+
+    public double getProcessLatency99_9P() {
+        return 
statProcessLatency.labels(metricsLabels).get().quantiles.get(0.999);
+    }
+
+    public double getTotalProcessed1min() {
+        return statTotalProcessed1min.labels(metricsLabels).get();
+    }
+
+    public double getTotalProcessedSuccessfully1min() {
+        return statTotalProcessedSuccessfully1min.labels(metricsLabels).get();
+    }
+
+    public double getTotalRecordsReceived1min() {
+        return statTotalRecordsRecieved1min.labels(metricsLabels).get();
+    }
+
+    public double getTotalSysExceptions1min() {
+        return statTotalSysExceptions1min.labels(metricsLabels).get();
+    }
+
+    public double getTotalUserExceptions1min() {
+        return statTotalUserExceptions1min.labels(metricsLabels).get();
+    }
+
+    public double getAvgProcessLatency1min() {
+        return statProcessLatency1min.labels(metricsLabels).get().count <= 0.0
+                ? 0 : statProcessLatency1min.labels(metricsLabels).get().sum / 
statProcessLatency1min.labels(metricsLabels).get().count;
+    }
+
+    public double getProcessLatency50P1min() {
+        return 
statProcessLatency1min.labels(metricsLabels).get().quantiles.get(0.5);
+    }
+
+    public double getProcessLatency90P1min() {
+        return 
statProcessLatency1min.labels(metricsLabels).get().quantiles.get(0.9);
+    }
+
+    public double getProcessLatency99P1min() {
+        return 
statProcessLatency1min.labels(metricsLabels).get().quantiles.get(0.99);
+    }
+
+    public double getProcessLatency99_9P1min() {
+        return 
statProcessLatency1min.labels(metricsLabels).get().quantiles.get(0.999);
+    }
+
     public void reset() {
-        statTotalProcessed.clear();
-        statTotalProcessedSuccessfully.clear();
-        statTotalSysExceptions.clear();
-        statTotalUserExceptions.clear();
-        statProcessLatency.clear();
-        statlastInvocation.clear();
-        statTotalRecordsRecieved.clear();
+        statTotalProcessed1min.clear();
+        statTotalProcessedSuccessfully1min.clear();
+        statTotalSysExceptions1min.clear();
+        statTotalUserExceptions1min.clear();
+        statProcessLatency1min.clear();
+        statTotalRecordsRecieved1min.clear();
         latestUserExceptions.clear();
         latestSystemExceptions.clear();
     }
+
+    @Override
+    public void close() {
+        scheduledFuture.cancel(false);
+    }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
new file mode 100644
index 0000000..7a6b2ca
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class InstanceCache {
+
+    private static InstanceCache instance;
+
+    public final ScheduledExecutorService executor;
+
+    private InstanceCache() {
+        executor = Executors.newSingleThreadScheduledExecutor();;
+    }
+
+    public static InstanceCache getInstanceCache() {
+        synchronized (InstanceCache.class) {
+            if (instance == null) {
+                instance = new InstanceCache();
+            }
+        }
+        return instance;
+    }
+
+    public static void shutdown() {
+        synchronized (InstanceCache.class) {
+            if (instance != null) {
+                instance.executor.shutdown();
+            }
+            instance = null;
+        }
+    }
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 3a54455..70ed588 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -23,8 +23,6 @@ import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import io.netty.buffer.ByteBuf;
 import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.Summary;
-import javax.swing.tree.ExpandVetoException;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -62,9 +60,11 @@ import org.apache.pulsar.functions.sink.PulsarSinkConfig;
 import org.apache.pulsar.functions.sink.PulsarSinkDisable;
 import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.source.PulsarSourceConfig;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.StateUtils;
+import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.Source;
@@ -110,7 +110,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
     private Throwable deathException;
 
     // function stats
-    private final FunctionStatsManager stats;
+    private FunctionStatsManager stats;
 
     private Record<?> currentRecord;
 
@@ -122,6 +122,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
     private CollectorRegistry collectorRegistry;
     private final String[] metricsLabels;
 
+    private InstanceCache instanceCache;
+
     public JavaInstanceRunnable(InstanceConfig instanceConfig,
                                 FunctionCacheManager fnCache,
                                 String jarFile,
@@ -134,7 +136,6 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
         this.jarFile = jarFile;
         this.client = (PulsarClientImpl) pulsarClient;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
-        this.stats = new FunctionStatsManager(collectorRegistry);
         this.secretsProvider = secretsProvider;
         this.collectorRegistry = collectorRegistry;
         this.metricsLabels = new String[]{
@@ -147,6 +148,11 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                 String.valueOf(instanceConfig.getInstanceId()),
                 instanceConfig.getClusterName()
         };
+
+        // Declare function local collector registry so that it will not clash 
with other function instances'
+        // metrics collection especially in threaded mode
+        // In process mode the JavaInstanceMain will declare a 
CollectorRegistry and pass it down
+        this.collectorRegistry = collectorRegistry;
     }
 
     /**
@@ -199,10 +205,15 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
      */
     @Override
     public void run() {
-        String functionName = null;
         try {
+            this.instanceCache = InstanceCache.getInstanceCache();
+
+            if (this.collectorRegistry == null) {
+                this.collectorRegistry = new CollectorRegistry();
+            }
+            this.stats = new FunctionStatsManager(this.collectorRegistry, 
this.metricsLabels, this.instanceCache.executor);
+
             ContextImpl contextImpl = setupContext();
-            functionName = String.format("%s-%s", contextImpl.getTenant(), 
contextImpl.getFunctionName());
             javaInstance = setupJavaInstance(contextImpl);
             if (null != stateTable) {
                 StateContextImpl stateContext = new 
StateContextImpl(stateTable);
@@ -212,7 +223,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
                 currentRecord = readInput();
 
                 // increment number of records received from source
-                stats.statTotalRecordsRecieved.labels(metricsLabels).inc();
+                stats.incrTotalReceived();
 
                 if 
(instanceConfig.getFunctionDetails().getProcessingGuarantees() == 
org.apache.pulsar.functions
                         .proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
@@ -225,18 +236,18 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                 JavaExecutionResult result;
 
                 // set last invocation time
-                
stats.statlastInvocation.labels(metricsLabels).set(System.currentTimeMillis());
+                stats.setLastInvocation(System.currentTimeMillis());
 
                 // start time for process latency stat
-                Summary.Timer requestTimer = 
stats.statProcessLatency.labels(metricsLabels).startTimer();
+                stats.processTimeStart();
 
                 // process the message
                 result = javaInstance.handleMessage(currentRecord, 
currentRecord.getValue());
 
                 // register end time
-                requestTimer.observeDuration();
+                stats.processTimeEnd();
                 // increment total processed
-                stats.statTotalProcessed.labels(metricsLabels).inc();
+                stats.incrTotalProcessed();
 
                 removeLogTopicHandler();
 
@@ -252,10 +263,13 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                 }
             }
         } catch (Throwable t) {
-            log.error("[{}] Uncaught exception in Java Instance", 
functionName, t);
+            log.error("[{}] Uncaught exception in Java Instance", 
Utils.getFullyQualifiedInstanceId(
+                    instanceConfig.getFunctionDetails().getTenant(),
+                    instanceConfig.getFunctionDetails().getNamespace(),
+                    instanceConfig.getFunctionDetails().getName(),
+                    instanceConfig.getInstanceId()), t);
             deathException = t;
-            stats.statTotalSysExceptions.labels(metricsLabels).inc();
-            stats.addSystemException(t);
+            stats.incrSysExceptions(t);
             return;
         } finally {
             log.info("Closing instance");
@@ -357,8 +371,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
                                JavaExecutionResult result) throws Exception {
         if (result.getUserException() != null) {
             log.info("Encountered user exception when processing message {}", 
srcRecord, result.getUserException());
-            stats.statTotalUserExceptions.labels(metricsLabels).inc();
-            stats.addUserException(result.getUserException() );
+            stats.incrUserExceptions(result.getUserException());
             srcRecord.fail();
         } else {
             if (result.getResult() != null) {
@@ -370,7 +383,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
                 }
             }
             // increment total successfully processed
-            stats.statTotalProcessedSuccessfully.labels(metricsLabels).inc();
+            stats.incrTotalProcessedSuccessfully();
         }
     }
 
@@ -403,6 +416,11 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
 
     @Override
     public void close() {
+
+        if (stats != null) {
+            stats.close();
+        }
+
         if (source != null) {
             try {
                 source.close();
@@ -465,35 +483,38 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
     private Builder createMetricsDataBuilder() {
         InstanceCommunication.MetricsData.Builder bldr = 
InstanceCommunication.MetricsData.newBuilder();
 
-        bldr.setProcessedTotal((long) 
stats.statTotalProcessed.labels(metricsLabels).get());
-        bldr.setProcessedSuccessfullyTotal((long) 
stats.statTotalProcessedSuccessfully.labels(metricsLabels).get());
-        bldr.setSystemExceptionsTotal((long) 
stats.statTotalSysExceptions.labels(metricsLabels).get());
-        bldr.setUserExceptionsTotal((long) 
stats.statTotalUserExceptions.labels(metricsLabels).get());
-        bldr.setReceivedTotal((long) 
stats.statTotalRecordsRecieved.labels(metricsLabels).get());
-        
bldr.setAvgProcessLatency(stats.statProcessLatency.labels(metricsLabels).get().count
 <= 0.0
-                ? 0 : stats.statProcessLatency.labels(metricsLabels).get().sum 
/ stats.statProcessLatency.labels(metricsLabels).get().count);
-        bldr.setLastInvocation((long) 
stats.statlastInvocation.labels(metricsLabels).get());
+        bldr.setProcessedTotal((long) stats.getTotalProcessed());
+        bldr.setProcessedSuccessfullyTotal((long) 
stats.getTotalProcessedSuccessfully());
+        bldr.setSystemExceptionsTotal((long) stats.getTotalSysExceptions());
+        bldr.setUserExceptionsTotal((long) stats.getTotalUserExceptions());
+        bldr.setReceivedTotal((long) stats.getTotalRecordsReceived());
+        bldr.setAvgProcessLatency(stats.getAvgProcessLatency());
+        bldr.setLastInvocation((long) stats.getLastInvocation());
+
+        bldr.setProcessedTotal1Min((long) stats.getTotalProcessed1min());
+        bldr.setProcessedSuccessfullyTotal1Min((long) 
stats.getTotalProcessedSuccessfully1min());
+        bldr.setSystemExceptionsTotal1Min((long) 
stats.getTotalSysExceptions1min());
+        bldr.setUserExceptionsTotal1Min((long) 
stats.getTotalUserExceptions1min());
+        bldr.setReceivedTotal1Min((long) stats.getTotalRecordsReceived1min());
+        bldr.setAvgProcessLatency1Min(stats.getAvgProcessLatency1min());
 
         return bldr;
     }
 
     public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
         InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = 
InstanceCommunication.FunctionStatus.newBuilder();
-        functionStatusBuilder.setNumProcessed((long) 
stats.statTotalProcessed.labels(metricsLabels).get());
-        functionStatusBuilder.setNumSuccessfullyProcessed((long) 
stats.statTotalProcessedSuccessfully.labels(metricsLabels).get());
-        functionStatusBuilder.setNumUserExceptions((long) 
stats.statTotalUserExceptions.labels(metricsLabels).get());
+        functionStatusBuilder.setNumProcessed((long) 
stats.getTotalProcessed());
+        functionStatusBuilder.setNumSuccessfullyProcessed((long) 
stats.getTotalProcessedSuccessfully());
+        functionStatusBuilder.setNumUserExceptions((long) 
stats.getTotalUserExceptions());
         stats.getLatestUserExceptions().forEach(ex -> {
             functionStatusBuilder.addLatestUserExceptions(ex);
         });
-        functionStatusBuilder.setNumSystemExceptions((long) 
stats.statTotalSysExceptions.labels(metricsLabels).get());
+        functionStatusBuilder.setNumSystemExceptions((long) 
stats.getTotalSysExceptions());
         stats.getLatestSystemExceptions().forEach(ex -> {
             functionStatusBuilder.addLatestSystemExceptions(ex);
         });
-        functionStatusBuilder.setAverageLatency(
-                stats.statProcessLatency.labels(metricsLabels).get().count == 
0.0
-                        ? 0 : 
stats.statProcessLatency.labels(metricsLabels).get().sum / 
stats.statProcessLatency
-                        .labels(metricsLabels).get().count);
-        functionStatusBuilder.setLastInvocationTime((long) 
stats.statlastInvocation.labels(metricsLabels).get());
+        functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency());
+        functionStatusBuilder.setLastInvocationTime((long) 
stats.getLastInvocation());
         return functionStatusBuilder;
     }
 
diff --git 
a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py 
b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
index 6c843dc..f88e6a3 100644
--- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
+++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
@@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='InstanceCommunication.proto',
   package='proto',
   syntax='proto3',
-  
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xb3\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 
\x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 
\x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 
\n\x18numSuccessfullyProcessed\x18\x05 
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 
\x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 
\x03(\x0b\x32*.proto.FunctionStatus.Ex [...]
+  
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xb3\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 
\x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 
\x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 
\n\x18numSuccessfullyProcessed\x18\x05 
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 
\x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 
\x03(\x0b\x32*.proto.FunctionStatus.Ex [...]
   ,
   dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,])
 
@@ -320,8 +320,8 @@ _METRICSDATA_USERMETRICSENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1117,
-  serialized_end=1167,
+  serialized_start=1317,
+  serialized_end=1367,
 )
 
 _METRICSDATA = _descriptor.Descriptor(
@@ -339,49 +339,91 @@ _METRICSDATA = _descriptor.Descriptor(
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='processedTotal', full_name='proto.MetricsData.processedTotal', 
index=1,
+      name='receivedTotal_1min', 
full_name='proto.MetricsData.receivedTotal_1min', index=1,
+      number=10, type=3, cpp_type=2, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='processedTotal', full_name='proto.MetricsData.processedTotal', 
index=2,
       number=3, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='processedSuccessfullyTotal', 
full_name='proto.MetricsData.processedSuccessfullyTotal', index=2,
+      name='processedTotal_1min', 
full_name='proto.MetricsData.processedTotal_1min', index=3,
+      number=11, type=3, cpp_type=2, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='processedSuccessfullyTotal', 
full_name='proto.MetricsData.processedSuccessfullyTotal', index=4,
       number=4, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='systemExceptionsTotal', 
full_name='proto.MetricsData.systemExceptionsTotal', index=3,
+      name='processedSuccessfullyTotal_1min', 
full_name='proto.MetricsData.processedSuccessfullyTotal_1min', index=5,
+      number=12, type=3, cpp_type=2, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='systemExceptionsTotal', 
full_name='proto.MetricsData.systemExceptionsTotal', index=6,
       number=5, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='userExceptionsTotal', 
full_name='proto.MetricsData.userExceptionsTotal', index=4,
+      name='systemExceptionsTotal_1min', 
full_name='proto.MetricsData.systemExceptionsTotal_1min', index=7,
+      number=13, type=3, cpp_type=2, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='userExceptionsTotal', 
full_name='proto.MetricsData.userExceptionsTotal', index=8,
       number=6, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='avgProcessLatency', 
full_name='proto.MetricsData.avgProcessLatency', index=5,
+      name='userExceptionsTotal_1min', 
full_name='proto.MetricsData.userExceptionsTotal_1min', index=9,
+      number=14, type=3, cpp_type=2, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='avgProcessLatency', 
full_name='proto.MetricsData.avgProcessLatency', index=10,
       number=7, type=1, cpp_type=5, label=1,
       has_default_value=False, default_value=float(0),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='lastInvocation', full_name='proto.MetricsData.lastInvocation', 
index=6,
+      name='avgProcessLatency_1min', 
full_name='proto.MetricsData.avgProcessLatency_1min', index=11,
+      number=15, type=1, cpp_type=5, label=1,
+      has_default_value=False, default_value=float(0),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='lastInvocation', full_name='proto.MetricsData.lastInvocation', 
index=12,
       number=8, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='userMetrics', full_name='proto.MetricsData.userMetrics', index=7,
+      name='userMetrics', full_name='proto.MetricsData.userMetrics', index=13,
       number=9, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
@@ -400,7 +442,7 @@ _METRICSDATA = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=850,
-  serialized_end=1167,
+  serialized_end=1367,
 )
 
 
@@ -430,8 +472,8 @@ _HEALTHCHECKRESULT = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1169,
-  serialized_end=1205,
+  serialized_start=1369,
+  serialized_end=1405,
 )
 
 
@@ -475,8 +517,8 @@ _METRICS_INSTANCEMETRICS = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1268,
-  serialized_end=1360,
+  serialized_start=1468,
+  serialized_end=1560,
 )
 
 _METRICS = _descriptor.Descriptor(
@@ -505,8 +547,8 @@ _METRICS = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1208,
-  serialized_end=1360,
+  serialized_start=1408,
+  serialized_end=1560,
 )
 
 _FUNCTIONSTATUS_EXCEPTIONINFORMATION.containing_type = _FUNCTIONSTATUS
@@ -608,8 +650,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
   file=DESCRIPTOR,
   index=0,
   options=None,
-  serialized_start=1363,
-  serialized_end=1711,
+  serialized_start=1563,
+  serialized_end=1911,
   methods=[
   _descriptor.MethodDescriptor(
     name='GetFunctionStatus',
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py 
b/pulsar-functions/instance/src/main/python/function_stats.py
index ff57509..f720a41 100644
--- a/pulsar-functions/instance/src/main/python/function_stats.py
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -19,6 +19,7 @@
 
 import traceback
 import time
+import util
 
 from prometheus_client import Counter, Summary, Gauge
 
@@ -37,6 +38,13 @@ class Stats(object):
   LAST_INVOCATION = 'last_invocation'
   TOTAL_RECEIVED = 'received_total'
 
+  TOTAL_PROCESSED_1min = 'processed_total_1min'
+  TOTAL_SUCCESSFULLY_PROCESSED_1min = 'processed_successfully_total_1min'
+  TOTAL_SYSTEM_EXCEPTIONS_1min = 'system_exceptions_total_1min'
+  TOTAL_USER_EXCEPTIONS_1min = 'user_exceptions_total_1min'
+  PROCESS_LATENCY_MS_1min = 'process_latency_ms_1min'
+  TOTAL_RECEIVED_1min = 'received_total_1min'
+
   # Declare Prometheus
   stat_total_processed = Counter(PULSAR_FUNCTION_METRICS_PREFIX + 
TOTAL_PROCESSED, 'Total number of messages processed.', metrics_label_names)
   stat_total_processed_successfully = Counter(PULSAR_FUNCTION_METRICS_PREFIX + 
TOTAL_SUCCESSFULLY_PROCESSED,
@@ -52,9 +60,116 @@ class Stats(object):
 
   stat_total_received = Counter(PULSAR_FUNCTION_METRICS_PREFIX + 
TOTAL_RECEIVED, 'Total number of messages received from source.', 
metrics_label_names)
 
+
+  # 1min windowed metrics
+  stat_total_processed_1min = Counter(PULSAR_FUNCTION_METRICS_PREFIX + 
TOTAL_PROCESSED_1min,
+                                 'Total number of messages processed in the 
last 1 minute.', metrics_label_names)
+  stat_total_processed_successfully_1min = 
Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_SUCCESSFULLY_PROCESSED_1min,
+                                              'Total number of messages 
processed successfully in the last 1 minute.', metrics_label_names)
+  stat_total_sys_exceptions_1min = Counter(PULSAR_FUNCTION_METRICS_PREFIX + 
TOTAL_SYSTEM_EXCEPTIONS_1min,
+                                      'Total number of system exceptions in 
the last 1 minute.',
+                                      metrics_label_names)
+  stat_total_user_exceptions_1min = Counter(PULSAR_FUNCTION_METRICS_PREFIX + 
TOTAL_USER_EXCEPTIONS_1min,
+                                       'Total number of user exceptions in the 
last 1 minute.',
+                                       metrics_label_names)
+
+  stat_process_latency_ms_1min = Summary(PULSAR_FUNCTION_METRICS_PREFIX + 
PROCESS_LATENCY_MS_1min,
+                                    'Process latency in milliseconds in the 
last 1 minute.', metrics_label_names)
+
+  stat_total_received_1min = Counter(PULSAR_FUNCTION_METRICS_PREFIX + 
TOTAL_RECEIVED_1min,
+                                'Total number of messages received from source 
in the last 1 minute.', metrics_label_names)
+
   latest_user_exception = []
   latest_sys_exception = []
 
+  def __init__(self, metrics_labels):
+    self.metrics_labels = metrics_labels;
+    self.process_start_time = None
+
+    # start time for windowed metrics
+    util.FixedTimer(60, self.reset).start()
+
+  def get_total_received(self):
+    return self.stat_total_received.labels(*self.metrics_labels)._value.get();
+
+  def get_total_processed(self):
+    return self.stat_total_processed.labels(*self.metrics_labels)._value.get();
+
+  def get_total_processed_successfully(self):
+    return 
self.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get();
+
+  def get_total_sys_exceptions(self):
+    return 
self.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get();
+
+  def get_total_user_exceptions(self):
+    return 
self.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get();
+
+  def get_avg_process_latency(self):
+    process_latency_ms_count = 
self.stat_process_latency_ms.labels(*self.metrics_labels)._count.get()
+    process_latency_ms_sum = 
self.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get()
+    return 0.0 \
+      if process_latency_ms_count <= 0.0 \
+      else process_latency_ms_sum / process_latency_ms_count
+
+  def get_total_received_1min(self):
+    return 
self.stat_total_received_1min.labels(*self.metrics_labels)._value.get();
+
+  def get_total_processed_1min(self):
+    return 
self.stat_total_processed_1min.labels(*self.metrics_labels)._value.get();
+
+  def get_total_processed_successfully_1min(self):
+    return 
self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)._value.get();
+
+  def get_total_sys_exceptions_1min(self):
+    return 
self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)._value.get();
+
+  def get_total_user_exceptions_1min(self):
+    return 
self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)._value.get();
+
+  def get_avg_process_latency_1min(self):
+    process_latency_ms_count = 
self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._count.get()
+    process_latency_ms_sum = 
self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._sum.get()
+    return 0.0 \
+      if process_latency_ms_count <= 0.0 \
+      else process_latency_ms_sum / process_latency_ms_count
+
+  def get_last_invocation(self):
+    return self.stat_last_invocation.labels(*self.metrics_labels)._value.get()
+
+  def incr_total_processed(self):
+    self.stat_total_processed.labels(*self.metrics_labels).inc()
+    self.stat_total_processed_1min.labels(*self.metrics_labels).inc()
+
+  def incr_total_processed_successfully(self):
+    self.stat_total_processed_successfully.labels(*self.metrics_labels).inc()
+    
self.stat_total_processed_successfully_1min.labels(*self.metrics_labels).inc()
+
+  def incr_total_sys_exceptions(self):
+    self.stat_total_sys_exceptions.labels(*self.metrics_labels).inc()
+    self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels).inc()
+    self.add_sys_exception()
+
+  def incr_total_user_exceptions(self):
+    self.stat_total_user_exceptions.labels(*self.metrics_labels).inc()
+    self.stat_total_user_exceptions_1min.labels(*self.metrics_labels).inc()
+    self.add_user_exception()
+
+  def incr_total_received(self):
+    self.stat_total_received.labels(*self.metrics_labels).inc()
+    self.stat_total_received_1min.labels(*self.metrics_labels).inc()
+
+  def process_time_start(self):
+    self.process_start_time = time.time();
+
+  def process_time_end(self):
+    if self.process_start_time:
+      duration = (time.time() - self.process_start_time) * 1000.0
+      
self.stat_process_latency_ms.labels(*self.metrics_labels).observe(duration)
+      
self.stat_process_latency_ms_1min.labels(*self.metrics_labels).observe(duration)
+
+  def set_last_invocation(self, time):
+    self.stat_last_invocation.labels(*self.metrics_labels).set(time * 1000.0)
+
   def add_user_exception(self):
     self.latest_sys_exception.append((traceback.format_exc(), int(time.time() 
* 1000)))
     if len(self.latest_sys_exception) > 10:
@@ -65,14 +180,13 @@ class Stats(object):
     if len(self.latest_sys_exception) > 10:
       self.latest_sys_exception.pop(0)
 
-  def reset(self, metrics_labels):
+  def reset(self):
     self.latest_user_exception = []
     self.latest_sys_exception = []
-    self.stat_total_processed.labels(*metrics_labels)._value.set(0.0)
-    
self.stat_total_processed_successfully.labels(*metrics_labels)._value.set(0.0)
-    self.stat_total_user_exceptions.labels(*metrics_labels)._value.set(0.0)
-    self.stat_total_sys_exceptions.labels(*metrics_labels)._value.set(0.0)
-    self.stat_process_latency_ms.labels(*metrics_labels)._sum.set(0.0)
-    self.stat_process_latency_ms.labels(*metrics_labels)._count.set(0.0)
-    self.stat_last_invocation.labels(*metrics_labels).set(0.0)
-    self.stat_total_received.labels(*metrics_labels)._value.set(0.0)
\ No newline at end of file
+    self.stat_total_processed_1min.labels(*self.metrics_labels)._value.set(0.0)
+    
self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)._value.set(0.0)
+    
self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)._value.set(0.0)
+    
self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)._value.set(0.0)
+    
self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._sum.set(0.0)
+    
self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._count.set(0.0)
+    self.stat_total_received_1min.labels(*self.metrics_labels)._value.set(0.0)
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index f05e7b2..ec85af1 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -88,7 +88,6 @@ class PythonInstance(object):
     self.atleast_once = 
self.instance_config.function_details.processingGuarantees == 
Function_pb2.ProcessingGuarantees.Value('ATLEAST_ONCE')
     self.auto_ack = self.instance_config.function_details.autoAck
     self.contextimpl = None
-    self.stats = Stats()
     self.last_health_check_ts = time.time()
     self.timeout_ms = function_details.source.timeoutMs if 
function_details.source.timeoutMs > 0 else None
     self.expected_healthcheck_interval = expected_healthcheck_interval
@@ -97,6 +96,7 @@ class PythonInstance(object):
                            "%s/%s" % (function_details.tenant, 
function_details.namespace),
                            "%s/%s/%s" % (function_details.tenant, 
function_details.namespace, function_details.name),
                            instance_id, cluster_name]
+    self.stats = Stats(self.metrics_labels)
 
   def health_check(self):
     self.last_health_check_ts = time.time()
@@ -199,31 +199,35 @@ class PythonInstance(object):
         successfully_executed = False
         try:
           # get user function start time for statistic calculation
-          start_time = time.time()
-          
Stats.stat_last_invocation.labels(*self.metrics_labels).set(start_time * 1000.0)
+          self.stats.set_last_invocation(time.time())
+
+          # start timer for process time
+          self.stats.process_time_start()
           if self.function_class is not None:
             output_object = self.function_class.process(input_object, 
self.contextimpl)
           else:
             output_object = self.function_purefunction.process(input_object)
           successfully_executed = True
-          
Stats.stat_process_latency_ms.labels(*self.metrics_labels).observe((time.time() 
- start_time) * 1000.0)
-          Stats.stat_total_processed.labels(*self.metrics_labels).inc()
+
+          # stop timer for process time
+          self.stats.process_time_end()
+
+          # incr total processed stat
+          self.stats.incr_total_processed()
         except Exception as e:
           Log.exception("Exception while executing user method")
-          Stats.stat_total_user_exceptions.labels(*self.metrics_labels).inc()
-          self.stats.add_user_exception()
+          self.stats.incr_total_user_exceptions();
 
         if self.log_topic_handler is not None:
           log.remove_all_handlers()
           log.add_handler(self.saved_log_handler)
         if successfully_executed:
           self.process_result(output_object, msg)
-          
Stats.stat_total_processed_successfully.labels(*self.metrics_labels).inc()
+          self.stats.incr_total_processed_successfully()
 
       except Exception as e:
         Log.error("Uncaught exception in Python instance: %s" % e);
-        Stats.stat_total_sys_exceptions.labels(*self.metrics_labels).inc()
-        self.stats.add_sys_exception()
+        self.stats.incr_total_sys_exceptions()
 
   def done_producing(self, consumer, orig_message, result, sent_message):
     if result == pulsar.Result.Ok and self.auto_ack and self.atleast_once:
@@ -269,7 +273,7 @@ class PythonInstance(object):
 
   def message_listener(self, serde, consumer, message):
     # increment number of received records from source
-    Stats.stat_total_received.labels(*self.metrics_labels).inc()
+    self.stats.incr_total_received()
     item = InternalMessage(message, consumer.topic(), serde, consumer)
     self.queue.put(item, True)
     if self.atmost_once and self.auto_ack:
@@ -282,31 +286,45 @@ class PythonInstance(object):
     return metrics
 
   def reset_metrics(self):
-    self.stats.reset(self.metrics_labels)
+    self.stats.reset()
     self.contextimpl.reset_metrics()
 
   def get_metrics(self):
 
-    total_received =  
Stats.stat_total_received.labels(*self.metrics_labels)._value.get()
-    total_processed = 
Stats.stat_total_processed.labels(*self.metrics_labels)._value.get()
-    total_processed_successfully = 
Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get()
-    total_user_exceptions = 
Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get()
-    total_sys_exceptions = 
Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get()
-    process_latency_ms_count = 
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get()
-    process_latency_ms_sum = 
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get()
-    last_invocation = 
Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get()
+    total_received =  self.stats.get_total_received()
+    total_processed = self.stats.get_total_processed()
+    total_processed_successfully = 
self.stats.get_total_processed_successfully()
+    total_user_exceptions = self.stats.get_total_user_exceptions()
+    total_sys_exceptions = self.stats.get_total_sys_exceptions()
+    avg_process_latency_ms = self.stats.get_avg_process_latency()
+    last_invocation = self.stats.get_last_invocation()
+
+    total_received_1min = self.stats.get_total_received_1min()
+    total_processed_1min = self.stats.get_total_processed_1min()
+    total_processed_successfully_1min = 
self.stats.get_total_processed_successfully_1min()
+    total_user_exceptions_1min = self.stats.get_total_user_exceptions_1min()
+    total_sys_exceptions_1min = self.stats.get_total_sys_exceptions_1min()
+    avg_process_latency_ms_1min = self.stats.get_avg_process_latency_1min()
 
     metrics_data = InstanceCommunication_pb2.MetricsData()
-
+    # total metrics
     metrics_data.receivedTotal = int(total_received) if sys.version_info.major 
>= 3 else long(total_received)
     metrics_data.processedTotal = int(total_processed) if 
sys.version_info.major >= 3 else long(total_processed)
     metrics_data.processedSuccessfullyTotal = 
int(total_processed_successfully) if sys.version_info.major >= 3 else 
long(total_processed_successfully)
     metrics_data.systemExceptionsTotal = int(total_sys_exceptions) if 
sys.version_info.major >= 3 else long(total_sys_exceptions)
     metrics_data.userExceptionsTotal = int(total_user_exceptions) if 
sys.version_info.major >= 3 else long(total_user_exceptions)
-    metrics_data.avgProcessLatency = 0.0 \
-      if process_latency_ms_count <= 0.0 \
-      else process_latency_ms_sum / process_latency_ms_count
+    metrics_data.avgProcessLatency = avg_process_latency_ms
     metrics_data.lastInvocation = int(last_invocation) if 
sys.version_info.major >= 3 else long(last_invocation)
+    # 1min metrics
+    metrics_data.receivedTotal_1min = int(total_received_1min) if 
sys.version_info.major >= 3 else long(total_received_1min)
+    metrics_data.processedTotal_1min = int(total_processed_1min) if 
sys.version_info.major >= 3 else long(total_processed_1min)
+    metrics_data.processedSuccessfullyTotal_1min = int(
+      total_processed_successfully_1min) if sys.version_info.major >= 3 else 
long(total_processed_successfully_1min)
+    metrics_data.systemExceptionsTotal_1min = int(total_sys_exceptions_1min) 
if sys.version_info.major >= 3 else long(
+      total_sys_exceptions_1min)
+    metrics_data.userExceptionsTotal_1min = int(total_user_exceptions_1min) if 
sys.version_info.major >= 3 else long(
+      total_user_exceptions_1min)
+    metrics_data.avgProcessLatency_1min = avg_process_latency_ms_1min
 
     # get any user metrics
     user_metrics = self.contextimpl.get_metrics()
@@ -325,13 +343,12 @@ class PythonInstance(object):
     status = InstanceCommunication_pb2.FunctionStatus()
     status.running = True
 
-    total_processed = 
Stats.stat_total_processed.labels(*self.metrics_labels)._value.get()
-    total_processed_successfully = 
Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get()
-    total_user_exceptions = 
Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get()
-    total_sys_exceptions = 
Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get()
-    process_latency_ms_count = 
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get()
-    process_latency_ms_sum = 
Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get()
-    last_invocation = 
Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get()
+    total_processed = self.stats.get_total_processed()
+    total_processed_successfully = 
self.stats.get_total_processed_successfully()
+    total_user_exceptions = self.stats.get_total_user_exceptions()
+    total_sys_exceptions = self.stats.get_total_sys_exceptions()
+    avg_process_latency_ms = self.stats.get_avg_process_latency()
+    last_invocation = self.stats.get_last_invocation()
 
     status.numProcessed = int(total_processed) if sys.version_info.major >= 3 
else long(total_processed)
     status.numSuccessfullyProcessed = int(total_processed_successfully) if 
sys.version_info.major >= 3 else long(total_processed_successfully)
@@ -346,9 +363,7 @@ class PythonInstance(object):
       to_add = status.latestSystemExceptions.add()
       to_add.exceptionString = ex
       to_add.msSinceEpoch = tm
-    status.averageLatency = 0.0 \
-      if process_latency_ms_count <= 0.0 \
-      else process_latency_ms_sum / process_latency_ms_count
+    status.averageLatency = avg_process_latency_ms
     status.lastInvocationTime = int(last_invocation) if sys.version_info.major 
>= 3 else long(last_invocation)
     return status
 
diff --git a/pulsar-functions/instance/src/main/python/util.py 
b/pulsar-functions/instance/src/main/python/util.py
index 56c1ce1..76f75bd 100644
--- a/pulsar-functions/instance/src/main/python/util.py
+++ b/pulsar-functions/instance/src/main/python/util.py
@@ -26,6 +26,7 @@ import os
 import inspect
 import sys
 import importlib
+from threading import Timer
 
 import log
 
@@ -66,3 +67,21 @@ def import_class_from_path(from_path, full_class_name):
 
 def getFullyQualifiedFunctionName(tenant, namespace, name):
   return "%s/%s/%s" % (tenant, namespace, name)
+
+class FixedTimer():
+
+    def __init__(self, t, hFunction):
+        self.t = t
+        self.hFunction = hFunction
+        self.thread = Timer(self.t, self.handle_function)
+
+    def handle_function(self):
+        self.hFunction()
+        self.thread = Timer(self.t, self.handle_function)
+        self.thread.start()
+
+    def start(self):
+        self.thread.start()
+
+    def cancel(self):
+        self.thread.cancel()
\ No newline at end of file
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto 
b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index d5f5b79..05d1399 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -71,21 +71,33 @@ message MetricsData {
     // Total number of records function received from source
     int64 receivedTotal = 2;
 
+    int64 receivedTotal_1min = 10;
+
     // Total number of records processed
     int64 processedTotal = 3;
 
+    int64 processedTotal_1min = 11;
+
     // Total number of records successfully processed by user function
     int64 processedSuccessfullyTotal = 4;
 
+    int64 processedSuccessfullyTotal_1min = 12;
+
     // Total number of system exceptions thrown
     int64 systemExceptionsTotal = 5;
 
+    int64 systemExceptionsTotal_1min = 13;
+
     // Total number of user exceptions thrown
     int64 userExceptionsTotal = 6;
 
+    int64 userExceptionsTotal_1min = 14;
+
     // Average process latency for function
     double avgProcessLatency = 7;
 
+    double avgProcessLatency_1min = 15;
+
     // Timestamp of when the function was last invoked
     int64 lastInvocation = 8;
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index 5688411..6b5abce 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -94,7 +94,7 @@ public class RuntimeSpawner implements AutoCloseable {
                             runtime.start();
                         } catch (Exception e) {
                             log.error("{}/{}/{}-{} Function Restart failed", 
details.getTenant(),
-                                    details.getNamespace(), details.getName(), 
e);
+                                    details.getNamespace(), details.getName(), 
e, e);
                         }
                         numRestarts++;
                     }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 913d4ae..460cdb0 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -47,7 +47,12 @@ class ThreadRuntime implements Runtime {
     private InstanceConfig instanceConfig;
     private JavaInstanceRunnable javaInstanceRunnable;
     private ThreadGroup threadGroup;
-
+    private FunctionCacheManager fnCache;
+    private String jarFile;
+    private PulsarClient pulsarClient;
+    private String stateStorageServiceUrl;
+    private SecretsProvider secretsProvider;
+    private CollectorRegistry collectorRegistry;
     ThreadRuntime(InstanceConfig instanceConfig,
                   FunctionCacheManager fnCache,
                   ThreadGroup threadGroup,
@@ -61,22 +66,21 @@ class ThreadRuntime implements Runtime {
             throw new RuntimeException("Thread Container only supports Java 
Runtime");
         }
 
-        // if collector registry is not set, create one for this thread.
-        // since each thread / instance will needs its own collector registry 
for metrics collection
-        CollectorRegistry instanceCollectorRegistry = collectorRegistry;
-        if (instanceCollectorRegistry == null) {
-            instanceCollectorRegistry = new CollectorRegistry();
-        }
-
-        this.javaInstanceRunnable = new JavaInstanceRunnable(
-            instanceConfig,
-            fnCache,
-            jarFile,
-            pulsarClient,
-            stateStorageServiceUrl,
-            secretsProvider,
-            instanceCollectorRegistry);
         this.threadGroup = threadGroup;
+        this.fnCache = fnCache;
+        this.jarFile = jarFile;
+        this.pulsarClient = pulsarClient;
+        this.stateStorageServiceUrl = stateStorageServiceUrl;
+        this.secretsProvider = secretsProvider;
+        this.collectorRegistry = collectorRegistry;
+        this.javaInstanceRunnable = new JavaInstanceRunnable(
+                instanceConfig,
+                fnCache,
+                jarFile,
+                pulsarClient,
+                stateStorageServiceUrl,
+                secretsProvider,
+                collectorRegistry);
     }
 
     /**
@@ -84,6 +88,16 @@ class ThreadRuntime implements Runtime {
      */
     @Override
     public void start() {
+        // re-initialize JavaInstanceRunnable so that variables in constructor 
can be re-initialized
+        this.javaInstanceRunnable = new JavaInstanceRunnable(
+                instanceConfig,
+                fnCache,
+                jarFile,
+                pulsarClient,
+                stateStorageServiceUrl,
+                secretsProvider,
+                collectorRegistry);
+
         log.info("ThreadContainer starting function with instance config {}", 
instanceConfig);
         this.fnThread = new Thread(threadGroup, javaInstanceRunnable,
                 String.format("%s-%s",
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index ed76117..a3fd850 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.instance.InstanceCache;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
@@ -115,5 +116,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory 
{
         } catch (PulsarClientException e) {
             log.warn("Failed to close pulsar client when closing function 
container factory", e);
         }
+
+        // Shutdown instance cache
+        InstanceCache.shutdown();
     }
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 8181e9f..7414446 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -336,4 +336,17 @@ public class Utils {
         }
         return null;
     }
+
+    public static String 
getFullyQualifiedInstanceId(org.apache.pulsar.functions.proto.Function.Instance 
instance) {
+        return getFullyQualifiedInstanceId(
+                
instance.getFunctionMetaData().getFunctionDetails().getTenant(),
+                
instance.getFunctionMetaData().getFunctionDetails().getNamespace(),
+                instance.getFunctionMetaData().getFunctionDetails().getName(),
+                instance.getInstanceId());
+    }
+
+    public static String getFullyQualifiedInstanceId(String tenant, String 
namespace,
+                                                     String functionName, int 
instanceId) {
+        return String.format("%s/%s/%s:%d", tenant, namespace, functionName, 
instanceId);
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index add4013..98ec5a1 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -58,7 +58,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Reflections;
 
 /**
@@ -318,7 +317,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         final String workerId = this.workerConfig.getWorkerId();
 
         if (assignedWorkerId.equals(workerId)) {
-            
stopFunction(Utils.getFullyQualifiedInstanceId(assignment.getInstance()), 
restart);
+            
stopFunction(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()),
 restart);
             return Response.status(Status.OK).build();
         } else {
             // query other worker
@@ -356,7 +355,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
             Assignment assignment = assignments.iterator().next();
             final String assignedWorkerId = assignment.getWorkerId();
             final String workerId = this.workerConfig.getWorkerId();
-            String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+            String fullyQualifiedInstanceId = 
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
             if (assignedWorkerId.equals(workerId)) {
                 stopFunction(fullyQualifiedInstanceId, restart);
             } else {
@@ -384,7 +383,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
             for (Assignment assignment : assignments) {
                 final String assignedWorkerId = assignment.getWorkerId();
                 final String workerId = this.workerConfig.getWorkerId();
-                String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+                String fullyQualifiedInstanceId = 
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
                 if (assignedWorkerId.equals(workerId)) {
                     stopFunction(fullyQualifiedInstanceId, restart);
                 } else {
@@ -427,7 +426,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         Map<String, Assignment> assignments = 
workerIdToAssignments.get(workerId);
         if (assignments != null) {
             assignments.values().forEach(assignment -> {
-                String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+                String fullyQualifiedInstanceId = 
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
                 try {
                     stopFunction(fullyQualifiedInstanceId, false);
                 } catch (Exception e) {
@@ -481,10 +480,10 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         // If I am running worker
         if (assignedWorkerId.equals(workerId)) {
             FunctionRuntimeInfo functionRuntimeInfo = 
this.getFunctionRuntimeInfo(
-                    
Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
+                    
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
             RuntimeSpawner runtimeSpawner = 
functionRuntimeInfo.getRuntimeSpawner();
             if (runtimeSpawner != null) {
-                return 
Utils.getFunctionInstanceStats(Utils.getFullyQualifiedInstanceId(assignment.getInstance()),
 functionRuntimeInfo).getMetrics();
+                return 
Utils.getFunctionInstanceStats(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()),
 functionRuntimeInfo).getMetrics();
             }
             return new 
FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
         } else {
@@ -619,7 +618,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         // If I am running worker
         if (assignedWorkerId.equals(workerId)) {
             FunctionRuntimeInfo functionRuntimeInfo = 
this.getFunctionRuntimeInfo(
-                    
Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
+                    
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
             RuntimeSpawner runtimeSpawner = 
functionRuntimeInfo.getRuntimeSpawner();
             if (runtimeSpawner != null) {
                 try {
@@ -753,7 +752,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
             existingAssignmentMap.putAll(entry);
         }
 
-        if 
(existingAssignmentMap.containsKey(Utils.getFullyQualifiedInstanceId(newAssignment.getInstance())))
 {
+        if 
(existingAssignmentMap.containsKey(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(newAssignment.getInstance())))
 {
             updateAssignment(newAssignment);
         } else {
             addAssignment(newAssignment);
@@ -761,7 +760,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     }
 
     private void updateAssignment(Assignment assignment) {
-        String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+        String fullyQualifiedInstanceId = 
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
         Assignment existingAssignment = this.findAssignment(assignment);
         // potential updates need to happen
         if (!existingAssignment.equals(assignment)) {
@@ -812,7 +811,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
 
     @VisibleForTesting
     void deleteAssignment(Assignment assignment) {
-        String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+        String fullyQualifiedInstanceId = 
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
         Map<String, Assignment> assignmentMap = 
this.workerIdToAssignments.get(assignment.getWorkerId());
         if (assignmentMap != null) {
             if (assignmentMap.containsKey(fullyQualifiedInstanceId)) {
@@ -835,7 +834,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     }
 
     private void startFunctionInstance(Assignment assignment) {
-        String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+        String fullyQualifiedInstanceId = 
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
         if 
(!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) {
             this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, new 
FunctionRuntimeInfo()
                     .setFunctionInstance(assignment.getInstance()));
@@ -888,7 +887,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
 
     private Assignment findAssignment(String tenant, String namespace, String 
functionName, int instanceId) {
         String fullyQualifiedInstanceId
-                = Utils.getFullyQualifiedInstanceId(tenant, namespace, 
functionName, instanceId);
+                = 
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(tenant, 
namespace, functionName, instanceId);
         for (Map.Entry<String, Map<String, Assignment>> entry : 
this.workerIdToAssignments.entrySet()) {
             Map<String, Assignment> assignmentMap = entry.getValue();
             Assignment existingAssignment = 
assignmentMap.get(fullyQualifiedInstanceId);
@@ -914,7 +913,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
             this.workerIdToAssignments.put(assignment.getWorkerId(), new 
HashMap<>());
         }
         this.workerIdToAssignments.get(assignment.getWorkerId()).put(
-                Utils.getFullyQualifiedInstanceId(assignment.getInstance()),
+                
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()),
                 assignment);
     }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index d6e7b47..efe702f 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -194,7 +194,7 @@ public class MembershipManager implements AutoCloseable, 
ConsumerEventListener {
             Map.Entry<Function.Instance, Long> entry = it.next();
             String fullyQualifiedFunctionName = 
FunctionDetailsUtils.getFullyQualifiedName(
                     entry.getKey().getFunctionMetaData().getFunctionDetails());
-            String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(entry.getKey());
+            String fullyQualifiedInstanceId = 
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(entry.getKey());
             //remove functions that don't exist anymore
             if (!functionMetaDataMap.containsKey(fullyQualifiedFunctionName)) {
                 it.remove();
@@ -262,7 +262,7 @@ public class MembershipManager implements AutoCloseable, 
ConsumerEventListener {
             if (currentTimeMs - unassignedDurationMs > 
this.workerConfig.getRescheduleTimeoutMs()) {
                 needSchedule.add(instance);
                 // remove assignment from failed node
-                Function.Assignment assignment = 
assignmentMap.get(Utils.getFullyQualifiedInstanceId(instance));
+                Function.Assignment assignment = 
assignmentMap.get(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(instance));
                 if (assignment != null) {
                     needRemove.add(assignment);
                 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index c496766..2a93494 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -31,10 +31,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
-import lombok.Getter;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -49,6 +47,7 @@ import 
org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Function.Instance;
 import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.worker.scheduler.IScheduler;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -250,7 +249,7 @@ public class SchedulerManager implements AutoCloseable {
 
     private void publishNewAssignment(Assignment assignment, boolean deleted) {
         try {
-            String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+            String fullyQualifiedInstanceId = 
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
             // publish empty message with instance-id key so, compactor can 
delete and skip delivery of this instance-id
             // message
             producer.newMessage().key(fullyQualifiedInstanceId)
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index 7defdf6..d6863a7 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -198,19 +199,6 @@ public final class Utils {
         }
     }
 
-    public static String getFullyQualifiedInstanceId(Function.Instance 
instance) {
-        return getFullyQualifiedInstanceId(
-                
instance.getFunctionMetaData().getFunctionDetails().getTenant(),
-                
instance.getFunctionMetaData().getFunctionDetails().getNamespace(),
-                instance.getFunctionMetaData().getFunctionDetails().getName(),
-                instance.getInstanceId());
-    }
-
-    public static String getFullyQualifiedInstanceId(String tenant, String 
namespace,
-                                                     String functionName, int 
instanceId) {
-        return String.format("%s/%s/%s:%d", tenant, namespace, functionName, 
instanceId);
-    }
-
     public static FunctionStats.FunctionInstanceStats 
getFunctionInstanceStats(String fullyQualifiedInstanceName, FunctionRuntimeInfo 
functionRuntimeInfo) {
         RuntimeSpawner functionRuntimeSpawner = 
functionRuntimeInfo.getRuntimeSpawner();
 
@@ -234,6 +222,12 @@ public final class Utils {
                     
functionInstanceStatsData.setAvgProcessLatency(metricsData.getAvgProcessLatency());
                     
functionInstanceStatsData.setLastInvocation(metricsData.getLastInvocation());
 
+                    
functionInstanceStatsData.oneMin.setReceivedTotal(metricsData.getReceivedTotal1Min());
+                    
functionInstanceStatsData.oneMin.setProcessedSuccessfullyTotal(metricsData.getProcessedSuccessfullyTotal1Min());
+                    
functionInstanceStatsData.oneMin.setSystemExceptionsTotal(metricsData.getSystemExceptionsTotal1Min());
+                    
functionInstanceStatsData.oneMin.setUserExceptionsTotal(metricsData.getUserExceptionsTotal1Min());
+                    
functionInstanceStatsData.oneMin.setAvgProcessLatency(metricsData.getAvgProcessLatency1Min());
+
                     // Filter out values that are NaN
                     Map<String, Double> statsDataMap = 
metricsData.getUserMetricsMap().entrySet().stream()
                             .filter(stringDoubleEntry -> 
!stringDoubleEntry.getValue().isNaN())
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/RestException.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/RestException.java
new file mode 100644
index 0000000..537ec1d
--- /dev/null
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/RestException.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import javax.ws.rs.core.Response.Status;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ErrorData;
+
+/**
+ * Exception used to provide better error messages to clients of the REST API.
+ */
+@SuppressWarnings("serial")
+public class RestException extends WebApplicationException {
+    static String getExceptionData(Throwable t) {
+        StringWriter writer = new StringWriter();
+        writer.append("\n --- An unexpected error occurred in the server 
---\n\n");
+        if (t != null) {
+            writer.append("Message: ").append(t.getMessage()).append("\n\n");
+        }
+        writer.append("Stacktrace:\n\n");
+
+        t.printStackTrace(new PrintWriter(writer));
+        return writer.toString();
+    }
+
+    public RestException(Response.Status status, String message) {
+        this(status.getStatusCode(), message);
+    }
+
+    public RestException(int code, String message) {
+        super(message, Response.status(code).entity(new 
ErrorData(message)).type(MediaType.APPLICATION_JSON).build());
+    }
+
+    public RestException(Throwable t) {
+        super(getResponse(t));
+    }
+
+    public RestException(PulsarAdminException cae) {
+        this(cae.getStatusCode(), cae.getHttpError());
+    }
+
+    private static Response getResponse(Throwable t) {
+        if (t instanceof RestException
+                || t instanceof WebApplicationException) {
+            WebApplicationException e = (WebApplicationException) t;
+            return e.getResponse();
+        } else {
+            return Response
+                    .status(Status.INTERNAL_SERVER_ERROR)
+                    .entity(getExceptionData(t))
+                    .type(MediaType.TEXT_PLAIN)
+                    .build();
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 627f5cc..474032e 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -105,6 +105,7 @@ import 
org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.Utils;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
+import org.apache.pulsar.functions.worker.rest.RestException;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.Source;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -802,33 +803,30 @@ public class FunctionsImpl {
 
     private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
 
-    public Response getFunctionStats(final String tenant, final String 
namespace, final String componentName,
+    public FunctionStats getFunctionStats(final String tenant, final String 
namespace, final String componentName,
                                      final String componentType, URI uri) 
throws IOException {
         if (!isWorkerServiceAvailable()) {
-            return getUnavailableResponse();
+            throw new RestException(Status.SERVICE_UNAVAILABLE, "Function 
worker service is not done initializing. Please try again in a little while.");
         }
 
         // validate parameters
         try {
             validateGetFunctionRequestParams(tenant, namespace, componentName, 
componentType);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid get {} Status request @ /{}/{}/{}", 
componentType, tenant, namespace, componentName, e);
-            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
+            log.error("Invalid get {} Stats request @ /{}/{}/{}", 
componentType, tenant, namespace, componentName, e);
+            throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
         FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, 
componentName)) {
-            log.error("{} in get {} Status does not exist @ /{}/{}/{}", 
componentType, componentType, tenant, namespace, componentName);
-            return 
Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("%s %s doesn't exist", 
componentType, componentName))).build();
+            log.error("{} in get {} Stats does not exist @ /{}/{}/{}", 
componentType, componentType, tenant, namespace, componentName);
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
         }
 
         FunctionMetaData functionMetaData = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
         if (!calculateSubjectType(functionMetaData).equals(componentType)) {
             log.error("{}/{}/{} is not a {}", tenant, namespace, 
componentName, componentType);
-            return 
Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format(componentType + " %s 
doesn't exist", componentName))).build();
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
         }
 
         FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
@@ -838,18 +836,18 @@ public class FunctionsImpl {
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
-            log.error("{}/{}/{} Got Exception Getting Status", tenant, 
namespace, componentName, e);
-            return Response.status(Status.INTERNAL_SERVER_ERROR).entity(new 
ErrorData(e.getMessage())).build();
+            log.error("{}/{}/{} Got Exception Getting Stats", tenant, 
namespace, componentName, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
         }
 
-        return 
Response.status(Status.OK).entity(gson.toJson(functionStats)).build();
+        return functionStats;
 
     }
 
-    public Response getFunctionsInstanceStats(final String tenant, final 
String namespace, final String componentName,
+    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData 
getFunctionsInstanceStats(final String tenant, final String namespace, final 
String componentName,
                                       final String componentType, String 
instanceId, URI uri) {
         if (!isWorkerServiceAvailable()) {
-            return getUnavailableResponse();
+            throw new RestException(Status.SERVICE_UNAVAILABLE, "Function 
worker service is not done initializing. Please try again in a little while.");
         }
 
         // validate parameters
@@ -857,27 +855,25 @@ public class FunctionsImpl {
             validateGetFunctionInstanceRequestParams(tenant, namespace, 
componentName, componentType, instanceId);
         } catch (IllegalArgumentException e) {
             log.error("Invalid get {} Stats request @ /{}/{}/{}", 
componentType, tenant, namespace, componentName, e);
-            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
+            throw new RestException(Status.BAD_REQUEST, e.getMessage());
+
         }
 
         FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, 
componentName)) {
             log.error("{} in get {} Stats does not exist @ /{}/{}/{}", 
componentType, componentType, tenant, namespace, componentName);
-            return 
Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("%s %s doesn't exist", 
componentType, componentName))).build();
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
         }
         FunctionMetaData functionMetaData = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
         if (!calculateSubjectType(functionMetaData).equals(componentType)) {
             log.error("{}/{}/{} is not a {}", tenant, namespace, 
componentName, componentType);
-            return 
Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format(componentType + " %s 
doesn't exist", componentName))).build();
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
+
         }
         int instanceIdInt = Integer.parseInt(instanceId);
         if (instanceIdInt < 0 || instanceIdInt >= 
functionMetaData.getFunctionDetails().getParallelism()) {
             log.error("instanceId in get {} Stats out of bounds @ /{}/{}/{}", 
componentType, tenant, namespace, componentName);
-            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Invalid 
InstanceId"))).build();
+            throw new RestException(Status.BAD_REQUEST, String.format("%s %s 
doesn't have instance with id %s", componentType, componentName, instanceId));
         }
 
         FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
@@ -889,10 +885,10 @@ public class FunctionsImpl {
             throw we;
         } catch (Exception e) {
             log.error("{}/{}/{} Got Exception Getting Stats", tenant, 
namespace, componentName, e);
-            return Response.status(Status.INTERNAL_SERVER_ERROR).entity(new 
ErrorData(e.getMessage())).build();
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
         }
 
-        return 
Response.status(Status.OK).entity(gson.toJson(functionInstanceStatsData)).build();
+        return functionInstanceStatsData;
     }
 
     public Response listFunctions(final String tenant, final String namespace, 
String componentType) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 3412d2e..ea23d26 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.functions.worker.rest.api.v2;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.policies.data.FunctionStats;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -28,7 +27,6 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.URI;
 import java.util.List;
 
 import javax.ws.rs.Consumes;
@@ -131,7 +129,7 @@ public class FunctionApiV2Resource extends 
FunctionApiResource {
             @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
     })
     @Path("/{tenant}/{namespace}/{functionName}/stats")
-    public Response getFunctionStats(final @PathParam("tenant") String tenant,
+    public FunctionStats getFunctionStats(final @PathParam("tenant") String 
tenant,
                                      final @PathParam("namespace") String 
namespace,
                                      final @PathParam("functionName") String 
functionName) throws IOException {
         return functions.getFunctionStats(tenant, namespace, functionName, 
FunctionsImpl.FUNCTION, uri.getRequestUri());
@@ -147,10 +145,10 @@ public class FunctionApiV2Resource extends 
FunctionApiResource {
             @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
     })
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
-    public Response getFunctionInstanceStats(final @PathParam("tenant") String 
tenant,
-                                             final @PathParam("namespace") 
String namespace,
-                                             final @PathParam("functionName") 
String functionName,
-                                             final @PathParam("instanceId") 
String instanceId) throws IOException {
+    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData 
getFunctionInstanceStats(final @PathParam("tenant") String tenant,
+                                                                               
                   final @PathParam("namespace") String namespace,
+                                                                               
                   final @PathParam("functionName") String functionName,
+                                                                               
                   final @PathParam("instanceId") String instanceId) throws 
IOException {
         return functions.getFunctionsInstanceStats(
                 tenant, namespace, functionName, FunctionsImpl.FUNCTION, 
instanceId, uri.getRequestUri());
     }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 20e4897..651e5d0 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.utils.Utils;
 import org.mockito.ArgumentMatcher;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -214,7 +215,7 @@ public class FunctionRuntimeManagerTest {
         functionRuntimeManager.processAssignment(assignment1);
         functionRuntimeManager.processAssignment(assignment2);
 
-        
functionRuntimeManager.deleteAssignment(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()));
+        
functionRuntimeManager.deleteAssignment(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()));
         
         verify(functionRuntimeManager, 
times(0)).setAssignment(any(Function.Assignment.class));
         verify(functionRuntimeManager, 
times(1)).deleteAssignment(any(String.class));
@@ -406,11 +407,11 @@ public class FunctionRuntimeManagerTest {
         List<Message<byte[]>> messageList = new LinkedList<>();
         Message message1 = spy(new MessageImpl("foo", 
MessageId.latest.toString(),
                         new HashMap<>(), 
Unpooled.copiedBuffer(assignment1.toByteArray()), null));
-        
doReturn(Utils.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
+        
doReturn(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
 
         Message message2 = spy(new MessageImpl("foo", 
MessageId.latest.toString(),
                 new HashMap<>(), 
Unpooled.copiedBuffer(assignment2.toByteArray()), null));
-        
doReturn(Utils.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
+        
doReturn(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
 
         // delete function2
         Message message3 = spy(new MessageImpl("foo", 
MessageId.latest.toString(),
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
index 21a2852..efc2974 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -28,6 +28,7 @@ import 
org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
 import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.utils.Utils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index ffd1bd6..465729e 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
 import org.mockito.Mockito;
 import org.mockito.invocation.Invocation;
@@ -154,7 +155,7 @@ public class SchedulerManagerTest {
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new 
HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
-        
assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
+        
assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
@@ -200,7 +201,7 @@ public class SchedulerManagerTest {
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new 
HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
-        
assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
+        
assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
@@ -247,7 +248,7 @@ public class SchedulerManagerTest {
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new 
HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
-        
assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
+        
assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
@@ -314,9 +315,9 @@ public class SchedulerManagerTest {
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new 
HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
-        
assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
+        
assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
         //TODO: delete this assignment
-        
assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment2.getInstance()),
 assignment2);
+        
assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2.getInstance()),
 assignment2);
 
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
@@ -373,7 +374,7 @@ public class SchedulerManagerTest {
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new 
HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
-        
assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
+        
assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
 
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
@@ -405,7 +406,7 @@ public class SchedulerManagerTest {
         Assert.assertEquals(assignments, assignment2);
 
         // updating assignments
-        
currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2.getInstance()),
 assignment2);
+        
currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2.getInstance()),
 assignment2);
 
         // scale up
 
@@ -484,7 +485,7 @@ public class SchedulerManagerTest {
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new 
HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
-        
assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
+        
assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
 
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
@@ -538,9 +539,9 @@ public class SchedulerManagerTest {
         assertTrue(allAssignments.contains(assignment2_3));
 
         // updating assignments
-        
currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_1.getInstance()),
 assignment2_1);
-        
currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_2.getInstance()),
 assignment2_2);
-        
currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_3.getInstance()),
 assignment2_3);
+        
currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_1.getInstance()),
 assignment2_1);
+        
currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_2.getInstance()),
 assignment2_2);
+        
currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_3.getInstance()),
 assignment2_3);
 
         // scale down
 
@@ -665,7 +666,7 @@ public class SchedulerManagerTest {
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new 
HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
-        
assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
+        
assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
 
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
@@ -716,9 +717,9 @@ public class SchedulerManagerTest {
         assertTrue(allAssignments.contains(assignment2_3));
 
         // updating assignments
-        
currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_1.getInstance()),
 assignment2_1);
-        
currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_2.getInstance()),
 assignment2_2);
-        
currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_3.getInstance()),
 assignment2_3);
+        
currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_1.getInstance()),
 assignment2_1);
+        
currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_2.getInstance()),
 assignment2_2);
+        
currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_3.getInstance()),
 assignment2_3);
 
         // update field
 
@@ -805,7 +806,7 @@ public class SchedulerManagerTest {
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new 
HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
-        
assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
+        
assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
         currentAssignments.put("worker-1", assignmentEntry1);
 
         Map<String, Function.Assignment> assignmentEntry2 = new HashMap<>();
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index d1b24a2..b510030 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -72,10 +72,12 @@ public class PulsarConnectorCache {
     }
 
     public static void shutdown() throws ManagedLedgerException, 
InterruptedException {
-        if (instance != null) {
-            instance.managedLedgerFactory.shutdown();
-            instance.statsProvider.stop();
-            instance = null;
+        synchronized (PulsarConnectorCache.class) {
+            if (instance != null) {
+                instance.managedLedgerFactory.shutdown();
+                instance.statsProvider.stop();
+                instance = null;
+            }
         }
     }
 }

Reply via email to