This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cc4dd8  add sink and source prometheus stats (#3261)
5cc4dd8 is described below

commit 5cc4dd8b759f1bfd0ed76f88e065aee65ac6f857
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Mon Dec 31 12:15:55 2018 -0800

    add sink and source prometheus stats (#3261)
    
    * add sink and source prometheus stats
    
    * fixing stuff
---
 distribution/io/src/assemble/io.xml                |   2 +-
 pulsar-broker/pom.xml                              |  10 +-
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    | 307 +++++++++++++++++++
 .../pulsar/functions/instance/ContextImpl.java     |  32 +-
 .../pulsar/functions/instance/InstanceUtils.java   |  26 ++
 .../functions/instance/JavaInstanceRunnable.java   |  18 +-
 .../instance/stats/ComponentStatsManager.java      | 170 +++++++++++
 .../instance/{ => stats}/FunctionStatsManager.java | 248 ++++-----------
 .../functions/instance/stats/SinkStatsManager.java | 340 +++++++++++++++++++++
 .../instance/stats/SourceStatsManager.java         | 339 ++++++++++++++++++++
 .../pulsar/functions/instance/ContextImplTest.java |   5 +-
 .../org/apache/pulsar/functions/utils/Utils.java   |  17 ++
 .../functions/worker/rest/api/ComponentImpl.java   |  23 +-
 .../functions/worker/rest/api/FunctionsImpl.java   |   3 +-
 .../pulsar/functions/worker/rest/api/SinkImpl.java |   3 +-
 .../functions/worker/rest/api/SourceImpl.java      |   3 +-
 .../rest/api/v2/FunctionApiV2ResourceTest.java     |   3 +-
 .../rest/api/v3/FunctionApiV3ResourceTest.java     |  11 +-
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  |   9 +-
 .../rest/api/v3/SourceApiV3ResourceTest.java       |   9 +-
 .../{data-genenator => data-generator}/pom.xml     |   0
 .../io/datagenerator/DataGeneratorPrintSink.java   |   0
 .../io/datagenerator/DataGeneratorSource.java      |   0
 .../org/apache/pulsar/io/datagenerator/Person.java |   0
 .../resources/META-INF/services/pulsar-io.yaml     |   0
 pulsar-io/pom.xml                                  |   2 +-
 26 files changed, 1337 insertions(+), 243 deletions(-)

diff --git a/distribution/io/src/assemble/io.xml 
b/distribution/io/src/assemble/io.xml
index a0704c3..10ea149 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -83,7 +83,7 @@
     </file>
 
     <file>
-      
<source>${basedir}/../../pulsar-io/data-genenator/target/pulsar-io-data-generator-${project.version}.nar</source>
+      
<source>${basedir}/../../pulsar-io/data-generator/target/pulsar-io-data-generator-${project.version}.nar</source>
       <outputDirectory>connectors</outputDirectory>
       <fileMode>644</fileMode>
     </file>
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 07c9d6f..c451132 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -272,6 +272,13 @@
     </dependency>
 
     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-data-generator</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>javax.xml.bind</groupId>
       <artifactId>jaxb-api</artifactId>
     </dependency>
@@ -328,7 +335,8 @@
                 <echo>copy test examples package</echo>
                 <mkdir dir="${basedir}/src/test/resources"/>
                 <copy 
file="${basedir}/../pulsar-functions/java-examples/target/pulsar-functions-api-examples.jar"
 tofile="${basedir}/src/test/resources/pulsar-functions-api-examples.jar"/>
-                </tasks>
+                <copy 
file="${basedir}/../pulsar-io/data-generator/target/pulsar-io-data-generator-${project.version}.nar"
 tofile="${basedir}/src/test/resources/pulsar-io-data-generator.nar"/>
+              </tasks>
             </configuration>
           </execution>
         </executions>
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 7841453..fa1177d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -40,6 +40,8 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
@@ -65,6 +67,7 @@ import java.net.HttpURLConnection;
 import java.net.InetAddress;
 import java.net.URL;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -78,6 +81,7 @@ import java.util.regex.Pattern;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static 
org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -191,6 +195,8 @@ public class PulsarFunctionE2ETest {
         
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
         admin.tenants().updateTenant(tenant, propAdmin);
 
+        System.setProperty(JAVA_INSTANCE_JAR_PROPERTY, "");
+
         Thread.sleep(100);
     }
 
@@ -257,6 +263,28 @@ public class PulsarFunctionE2ETest {
         return functionConfig;
     }
 
+    private static SourceConfig createSourceConfig(String tenant, String 
namespace, String functionName, String sinkTopic) {
+        SourceConfig sourceConfig = new SourceConfig();
+        sourceConfig.setTenant(tenant);
+        sourceConfig.setNamespace(namespace);
+        sourceConfig.setName(functionName);
+        sourceConfig.setParallelism(1);
+        
sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        sourceConfig.setTopicName(sinkTopic);
+        return sourceConfig;
+    }
+
+    private static SinkConfig createSinkConfig(String tenant, String 
namespace, String functionName, String sourceTopic, String subName) {
+        SinkConfig sinkConfig = new SinkConfig();
+        sinkConfig.setTenant(tenant);
+        sinkConfig.setNamespace(namespace);
+        sinkConfig.setName(functionName);
+        sinkConfig.setParallelism(1);
+        
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        sinkConfig.setInputs(Collections.singleton(sourceTopic));
+        sinkConfig.setSourceSubscriptionName(subName);
+        return sinkConfig;
+    }
     /**
      * Validates pulsar sink e2e functionality on functions.
      *
@@ -325,6 +353,285 @@ public class PulsarFunctionE2ETest {
     }
 
     @Test(timeOut = 20000)
+    public void testPulsarSinkStats() throws Exception {
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sourceTopic = "persistent://" + replNamespace + "/input";
+        final String functionName = "PulsarSink-test";
+        final String propertyKey = "key";
+        final String propertyValue = "value";
+        final String subscriptionName = "test-sub";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
+
+        // create a producer that creates a topic at broker
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+
+        String jarFilePathUrl = Utils.FILE + ":" + 
getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+        SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, 
functionName, sourceTopic, subscriptionName);
+        admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl);
+
+        admin.sink().updateSinkWithUrl(sinkConfig, jarFilePathUrl);
+
+        retryStrategically((test) -> {
+            try {
+                return 
admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+        // validate pulsar sink consumer has started on the topic
+        
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+        // validate prometheus metrics empty
+        String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
+        log.info("prometheus metrics: {}", prometheusMetrics);
+
+        Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
+        Metric m = metrics.get("pulsar_sink_received_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_sink_received_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_sink_written_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_sink_written_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_sink_sink_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_sink_system_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_sink_system_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_sink_last_invocation");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, 0.0);
+
+        int totalMsgs = 10;
+        for (int i = 0; i < totalMsgs; i++) {
+            String data = "my-message-" + i;
+            producer.newMessage().property(propertyKey, 
propertyValue).value(data).send();
+        }
+        retryStrategically((test) -> {
+            try {
+                SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+                return subStats.unackedMessages == 0 && 
subStats.msgThroughputOut == totalMsgs;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 200);
+
+        // get stats after producing
+        prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
+        log.info("prometheusMetrics: {}", prometheusMetrics);
+
+        metrics = parseMetrics(prometheusMetrics);
+        m = metrics.get("pulsar_sink_received_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, (double) totalMsgs);
+        m = metrics.get("pulsar_sink_received_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, (double) totalMsgs);
+        m = metrics.get("pulsar_sink_written_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, (double) totalMsgs);
+        m = metrics.get("pulsar_sink_written_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, (double) totalMsgs);
+        m = metrics.get("pulsar_sink_sink_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_sink_system_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_sink_system_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_sink_last_invocation");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertTrue(m.value > 0.0);
+    }
+
+    @Test(timeOut = 20000)
+    public void testPulsarSourceStats() throws Exception {
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String functionName = "PulsarSource-test";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
+
+        String jarFilePathUrl = Utils.FILE + ":" + 
getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+        SourceConfig sourceConfig = createSourceConfig(tenant, 
namespacePortion, functionName, sinkTopic);
+        admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
+
+        admin.source().updateSourceWithUrl(sourceConfig, jarFilePathUrl);
+
+        retryStrategically((test) -> {
+            try {
+                return (admin.topics().getStats(sinkTopic).publishers.size() 
== 1) && (admin.topics().getInternalStats(sinkTopic).numberOfEntries > 4);
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 10, 150);
+        assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 1);
+
+        String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
+        log.info("prometheusMetrics: {}", prometheusMetrics);
+
+        Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
+        Metric m = metrics.get("pulsar_source_received_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_received_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_written_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_written_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_source_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_source_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_system_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_system_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_last_invocation");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("function"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
+        assertTrue(m.value > 0.0);
+    }
+
+    @Test(timeOut = 20000)
     public void testPulsarFunctionStats() throws Exception {
 
         final String namespacePortion = "io";
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 69fd6c8..460ba75 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -34,9 +34,14 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
+import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
+import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
+import org.apache.pulsar.functions.instance.stats.SinkStatsManager;
+import org.apache.pulsar.functions.instance.stats.SourceStatsManager;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.source.TopicSchema;
+import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
@@ -52,7 +57,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static com.google.common.base.Preconditions.checkState;
-import static 
org.apache.pulsar.functions.instance.FunctionStatsManager.USER_METRIC_PREFIX;
+import static 
org.apache.pulsar.functions.instance.stats.FunctionStatsManager.USER_METRIC_PREFIX;
 
 /**
  * This class implements the Context interface exposed to the user.
@@ -87,12 +92,13 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
     private final static String[] userMetricsLabelNames;
     static {
         // add label to indicate user metric
-        userMetricsLabelNames = 
Arrays.copyOf(FunctionStatsManager.metricsLabelNames, 
FunctionStatsManager.metricsLabelNames.length + 1);
-        userMetricsLabelNames[FunctionStatsManager.metricsLabelNames.length] = 
"metric";
+        userMetricsLabelNames = 
Arrays.copyOf(ComponentStatsManager.metricsLabelNames, 
ComponentStatsManager.metricsLabelNames.length + 1);
+        userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length] 
= "metric";
     }
 
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient 
client, List<String> inputTopics,
-                       SecretsProvider secretsProvider, CollectorRegistry 
collectorRegistry, String[] metricsLabels) {
+                       SecretsProvider secretsProvider, CollectorRegistry 
collectorRegistry, String[] metricsLabels,
+                       Utils.ComponentType componentType) {
         this.config = config;
         this.logger = logger;
         this.publishProducers = new HashMap<>();
@@ -119,9 +125,23 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
         }
 
         this.metricsLabels = metricsLabels;
+        String prefix;
+        switch (componentType) {
+            case FUNCTION:
+                prefix = FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX;
+                break;
+            case SINK:
+                prefix = SinkStatsManager.PULSAR_SINK_METRICS_PREFIX;
+                break;
+            case SOURCE:
+                prefix = SourceStatsManager.PULSAR_SOURCE_METRICS_PREFIX;
+                break;
+            default:
+                throw new RuntimeException("Unknown component type: " + 
componentType);
+        }
         this.userMetricsSummary = Summary.build()
-                .name("pulsar_function_user_metric")
-                .help("Pulsar Function user defined metric.")
+                .name(prefix + ComponentStatsManager.USER_METRIC_PREFIX)
+                .help("User defined metric.")
                 .labelNames(userMetricsLabelNames)
                 .quantile(0.5, 0.01)
                 .quantile(0.9, 0.01)
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index 86a9aa2..9e32736 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -19,15 +19,22 @@
 package org.apache.pulsar.functions.instance;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
 
 import lombok.experimental.UtilityClass;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.utils.Reflections;
 
 import net.jodah.typetools.TypeResolver;
+import org.apache.pulsar.functions.utils.Utils;
 
 @UtilityClass
 public class InstanceUtils {
@@ -77,4 +84,23 @@ public class InstanceUtils {
             return Reflections.createInstance(className, baseClass, clsLoader);
         }
     }
+
+    public Utils.ComponentType calculateSubjectType(Function.FunctionDetails 
functionDetails) {
+        Function.SourceSpec sourceSpec = functionDetails.getSource();
+        Function.SinkSpec sinkSpec = functionDetails.getSink();
+        if (sourceSpec.getInputSpecsCount() == 0) {
+            return SOURCE;
+        }
+        // Now its between sink and function
+
+        if (!isEmpty(sinkSpec.getBuiltin())) {
+            // if its built in, its a sink
+            return SINK;
+        }
+
+        if (isEmpty(sinkSpec.getClassName()) || 
sinkSpec.getClassName().equals(PulsarSink.class.getName())) {
+            return FUNCTION;
+        }
+        return SINK;
+    }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index e858e90..7c36b58 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -55,6 +55,8 @@ import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
+import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
+import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -116,7 +118,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
 
     // function stats
     @Getter
-    private FunctionStatsManager stats;
+    private ComponentStatsManager stats;
 
     private Record<?> currentRecord;
 
@@ -202,7 +204,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
         }
         Logger instanceLog = LoggerFactory.getLogger(
                 "function-" + instanceConfig.getFunctionDetails().getName());
-        return new ContextImpl(instanceConfig, instanceLog, client, 
inputTopics, secretsProvider, collectorRegistry, metricsLabels);
+        return new ContextImpl(instanceConfig, instanceLog, client, 
inputTopics, secretsProvider,
+                collectorRegistry, metricsLabels, 
InstanceUtils.calculateSubjectType(this.instanceConfig.getFunctionDetails()));
     }
 
     /**
@@ -216,7 +219,9 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
             if (this.collectorRegistry == null) {
                 this.collectorRegistry = new CollectorRegistry();
             }
-            this.stats = new FunctionStatsManager(this.collectorRegistry, 
this.metricsLabels, this.instanceCache.getScheduledExecutorService());
+            this.stats = 
ComponentStatsManager.getStatsManager(this.collectorRegistry, 
this.metricsLabels,
+                    this.instanceCache.getScheduledExecutorService(),
+                    
InstanceUtils.calculateSubjectType(this.instanceConfig.getFunctionDetails()));
 
             ContextImpl contextImpl = setupContext();
             javaInstance = setupJavaInstance(contextImpl);
@@ -284,6 +289,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
 
     private void loadJars() throws Exception {
         try {
+            log.info("jarFile: {}", jarFile);
             // Let's first try to treat it as a nar archive
             fnCache.registerFunctionInstanceWithArchive(
                 instanceConfig.getFunctionId(),
@@ -546,6 +552,12 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
         stats.getLatestSystemExceptions().forEach(ex -> {
             functionStatusBuilder.addLatestSystemExceptions(ex);
         });
+        stats.getLatestSourceExceptions().forEach(ex -> {
+            functionStatusBuilder.addLatestSourceExceptions(ex);
+        });
+        stats.getLatestSinkExceptions().forEach(ex -> {
+            functionStatusBuilder.addLatestSinkExceptions(ex);
+        });
         functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency());
         functionStatusBuilder.setLastInvocationTime((long) 
stats.getLastInvocation());
         return functionStatusBuilder;
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
new file mode 100644
index 0000000..3cb654b
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.stats;
+
+import com.google.common.collect.EvictingQueue;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.utils.Utils;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public abstract class ComponentStatsManager implements AutoCloseable {
+
+    protected String[] metricsLabels;
+
+    protected ScheduledFuture<?> scheduledFuture;
+
+    protected final CollectorRegistry collectorRegistry;
+
+    protected final EvictingQueue EMPTY_QUEUE = EvictingQueue.create(0);
+
+    public final static String USER_METRIC_PREFIX = "user_metric_";
+
+    public static final String[] metricsLabelNames = {"tenant", "namespace", 
"function", "instance_id", "cluster", "fqfn"};
+
+    protected static final String[] exceptionMetricsLabelNames;
+
+    static {
+        exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames, 
metricsLabelNames.length + 2);
+        exceptionMetricsLabelNames[metricsLabelNames.length] = "error";
+        exceptionMetricsLabelNames[metricsLabelNames.length + 1] = "ts";
+    }
+
+    public static ComponentStatsManager getStatsManager(CollectorRegistry 
collectorRegistry,
+                                  String[] metricsLabels,
+                                  ScheduledExecutorService 
scheduledExecutorService,
+                                  Utils.ComponentType componentType) {
+        switch (componentType) {
+            case FUNCTION:
+                return new FunctionStatsManager(collectorRegistry, 
metricsLabels, scheduledExecutorService);
+            case SOURCE:
+                return new SourceStatsManager(collectorRegistry, 
metricsLabels, scheduledExecutorService);
+            case SINK:
+                return new SinkStatsManager(collectorRegistry, metricsLabels, 
scheduledExecutorService);
+            default:
+                throw new RuntimeException("Unknown component type: " + 
componentType);
+        }
+    }
+
+    public ComponentStatsManager(CollectorRegistry collectorRegistry,
+                         String[] metricsLabels,
+                         ScheduledExecutorService scheduledExecutorService) {
+
+        this.collectorRegistry = collectorRegistry;
+        this.metricsLabels = metricsLabels;
+
+        scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new 
Runnable() {
+            @Override
+            public void run() {
+                try {
+                    reset();
+                } catch (Exception e) {
+                    log.error("Failed to reset metrics for 1min window", e);
+                }
+            }
+        }, 1, 1, TimeUnit.MINUTES);
+    }
+
+    public abstract void reset();
+
+    public abstract void incrTotalReceived();
+
+    public abstract void incrTotalProcessedSuccessfully();
+
+    public abstract void incrSysExceptions(Throwable sysException);
+
+    public abstract void incrUserExceptions(Exception userException);
+
+    public abstract void incrSourceExceptions(Exception userException);
+
+    public abstract void incrSinkExceptions(Exception userException);
+
+    public abstract void setLastInvocation(long ts);
+
+    public abstract void processTimeStart();
+
+    public abstract void processTimeEnd();
+
+    public abstract double getTotalProcessedSuccessfully();
+
+    public abstract double getTotalRecordsReceived();
+
+    public abstract double getTotalSysExceptions();
+
+    public abstract double getTotalUserExceptions();
+
+    public abstract double getLastInvocation();
+
+    public abstract double getAvgProcessLatency();
+
+    public abstract double getTotalProcessedSuccessfully1min();
+
+    public abstract double getTotalRecordsReceived1min();
+
+    public abstract double getTotalSysExceptions1min();
+
+    public abstract double getTotalUserExceptions1min();
+
+    public abstract double getAvgProcessLatency1min();
+
+    public abstract 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
getLatestUserExceptions();
+
+    public abstract 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
getLatestSystemExceptions();
+
+    public abstract 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
getLatestSourceExceptions();
+
+    public abstract 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
getLatestSinkExceptions();
+
+    public String getStatsAsString() throws IOException {
+        StringWriter outputWriter = new StringWriter();
+
+        TextFormat.write004(outputWriter, 
collectorRegistry.metricFamilySamples());
+
+        return outputWriter.toString();
+    }
+
+    protected InstanceCommunication.FunctionStatus.ExceptionInformation 
getExceptionInfo(Throwable th, long ts) {
+        InstanceCommunication.FunctionStatus.ExceptionInformation.Builder 
exceptionInfoBuilder =
+                
InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder().setMsSinceEpoch(ts);
+        String msg = th.getMessage();
+        if (msg != null) {
+            exceptionInfoBuilder.setExceptionString(msg);
+        }
+        return exceptionInfoBuilder.build();
+    }
+
+
+    @Override
+    public void close() {
+        if (scheduledFuture != null) {
+            scheduledFuture.cancel(false);
+            scheduledFuture = null;
+        }
+    }
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
similarity index 64%
rename from 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
rename to 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
index 818fdb3..d03bfe2 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
@@ -16,25 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.instance;
+package org.apache.pulsar.functions.instance.stats;
 
 import com.google.common.collect.EvictingQueue;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
 import io.prometheus.client.Summary;
-import io.prometheus.client.exporter.common.TextFormat;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.util.RateLimiter;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 
-import java.io.IOException;
-import java.io.StringWriter;
 import java.util.Arrays;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -43,18 +39,9 @@ import java.util.concurrent.TimeUnit;
 @Slf4j
 @Getter
 @Setter
-public class FunctionStatsManager implements AutoCloseable {
-
-    static final String[] metricsLabelNames = {"tenant", "namespace", 
"function", "instance_id", "cluster", "fqfn"};
-    static final String[] exceptionMetricsLabelNames;
-    static {
-        exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames, 
metricsLabelNames.length + 2);
-        exceptionMetricsLabelNames[metricsLabelNames.length] = "error";
-        exceptionMetricsLabelNames[metricsLabelNames.length + 1] = "ts";
-    }
+public class FunctionStatsManager extends ComponentStatsManager{
 
     public static final String PULSAR_FUNCTION_METRICS_PREFIX = 
"pulsar_function_";
-    public final static String USER_METRIC_PREFIX = "user_metric_";
 
     /** Declare metric names **/
     public static final String PROCESSED_SUCCESSFULLY_TOTAL = 
"processed_successfully_total";
@@ -81,16 +68,12 @@ public class FunctionStatsManager implements AutoCloseable {
     final Counter statTotalSysExceptions;
 
     final Counter statTotalUserExceptions;
-
-    final Counter statTotalSourceExceptions;
-
-    final Counter statTotalSinkExceptions;
-
+    
     final Summary statProcessLatency;
 
     final Gauge statlastInvocation;
 
-    final Counter statTotalRecordsRecieved;
+    final Counter statTotalRecordsReceived;
     
     // windowed metrics
 
@@ -99,14 +82,10 @@ public class FunctionStatsManager implements AutoCloseable {
     final Counter statTotalSysExceptions1min;
 
     final Counter statTotalUserExceptions1min;
-
-    final Counter statTotalSourceExceptions1min;
-
-    final Counter statTotalSinkExceptions1min;
-
+    
     final Summary statProcessLatency1min;
 
-    final Counter statTotalRecordsRecieved1min;
+    final Counter statTotalRecordsReceived1min;
 
     // exceptions
 
@@ -122,47 +101,28 @@ public class FunctionStatsManager implements 
AutoCloseable {
     private final Counter.Child _statTotalProcessedSuccessfully;
     private final Counter.Child _statTotalSysExceptions;
     private final Counter.Child _statTotalUserExceptions;
-    private final Counter.Child _statTotalSourceExceptions;
-    private final Counter.Child _statTotalSinkExceptions;
     private final Summary.Child _statProcessLatency;
     private final Gauge.Child _statlastInvocation;
-    private final Counter.Child _statTotalRecordsRecieved;
+    private final Counter.Child _statTotalRecordsReceived;
     private Counter.Child _statTotalProcessedSuccessfully1min;
     private Counter.Child _statTotalSysExceptions1min;
     private Counter.Child _statTotalUserExceptions1min;
-    private Counter.Child _statTotalSourceExceptions1min;
-    private Counter.Child _statTotalSinkExceptions1min;
     private Summary.Child _statProcessLatency1min;
-    private Counter.Child _statTotalRecordsRecieved1min;
-
-    private String[] metricsLabels;
-
-    private ScheduledFuture<?> scheduledFuture;
-
-    private final CollectorRegistry collectorRegistry;
+    private Counter.Child _statTotalRecordsReceived1min;
 
     @Getter
     private 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
latestUserExceptions = EvictingQueue.create(10);
     @Getter
     private 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
latestSystemExceptions = EvictingQueue.create(10);
-    @Getter
-    private 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
latestSourceExceptions = EvictingQueue.create(10);
-    @Getter
-    private 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
latestSinkExceptions = EvictingQueue.create(10);
 
     private final RateLimiter userExceptionRateLimiter;
 
     private final RateLimiter sysExceptionRateLimiter;
 
-    private final RateLimiter sourceExceptionRateLimiter;
-
-    private final RateLimiter sinkExceptionRateLimiter;
-
-    public FunctionStatsManager(CollectorRegistry collectorRegistry, String[] 
metricsLabels, ScheduledExecutorService scheduledExecutorService) {
-
-        this.collectorRegistry = collectorRegistry;
-
-        this.metricsLabels = metricsLabels;
+    public FunctionStatsManager(CollectorRegistry collectorRegistry,
+                                String[] metricsLabels,
+                                ScheduledExecutorService 
scheduledExecutorService) {
+        super(collectorRegistry, metricsLabels, scheduledExecutorService);
 
         statTotalProcessedSuccessfully = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + 
PROCESSED_SUCCESSFULLY_TOTAL)
@@ -185,20 +145,6 @@ public class FunctionStatsManager implements AutoCloseable 
{
                 .register(collectorRegistry);
         _statTotalUserExceptions = 
statTotalUserExceptions.labels(metricsLabels);
 
-        statTotalSourceExceptions = Counter.build()
-                .name(PULSAR_FUNCTION_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL)
-                .help("Total number of source exceptions.")
-                .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
-        _statTotalSourceExceptions = 
statTotalSourceExceptions.labels(metricsLabels);
-
-        statTotalSinkExceptions = Counter.build()
-                .name(PULSAR_FUNCTION_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL)
-                .help("Total number of sink exceptions.")
-                .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
-        _statTotalSinkExceptions = 
statTotalSinkExceptions.labels(metricsLabels);
-
         statProcessLatency = Summary.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS)
                 .help("Process latency in milliseconds.")
@@ -217,12 +163,12 @@ public class FunctionStatsManager implements 
AutoCloseable {
                 .register(collectorRegistry);
         _statlastInvocation = statlastInvocation.labels(metricsLabels);
 
-        statTotalRecordsRecieved = Counter.build()
+        statTotalRecordsReceived = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL)
                 .help("Total number of messages received from source.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
-        _statTotalRecordsRecieved = 
statTotalRecordsRecieved.labels(metricsLabels);
+        _statTotalRecordsReceived = 
statTotalRecordsReceived.labels(metricsLabels);
 
         statTotalProcessedSuccessfully1min = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + 
PROCESSED_SUCCESSFULLY_TOTAL_1min)
@@ -245,20 +191,6 @@ public class FunctionStatsManager implements AutoCloseable 
{
                 .register(collectorRegistry);
         _statTotalUserExceptions1min = 
statTotalUserExceptions1min.labels(metricsLabels);
 
-        statTotalSourceExceptions1min = Counter.build()
-                .name(PULSAR_FUNCTION_METRICS_PREFIX + 
SOURCE_EXCEPTIONS_TOTAL_1min)
-                .help("Total number of source exceptions in the last 1 
minute.")
-                .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
-        _statTotalSourceExceptions1min = 
statTotalSourceExceptions1min.labels(metricsLabels);
-
-        statTotalSinkExceptions1min = Counter.build()
-                .name(PULSAR_FUNCTION_METRICS_PREFIX + 
SINK_EXCEPTIONS_TOTAL_1min)
-                .help("Total number of sink exceptions in the last 1 minute.")
-                .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
-        _statTotalSinkExceptions1min = 
statTotalSinkExceptions1min.labels(metricsLabels);
-
         statProcessLatency1min = Summary.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min)
                 .help("Process latency in milliseconds in the last 1 minute.")
@@ -270,12 +202,12 @@ public class FunctionStatsManager implements 
AutoCloseable {
                 .register(collectorRegistry);
         _statProcessLatency1min = statProcessLatency1min.labels(metricsLabels);
 
-        statTotalRecordsRecieved1min = Counter.build()
+        statTotalRecordsReceived1min = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min)
                 .help("Total number of messages received from source in the 
last 1 minute.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
-        _statTotalRecordsRecieved1min = 
statTotalRecordsRecieved1min.labels(metricsLabels);
+        _statTotalRecordsReceived1min = 
statTotalRecordsReceived1min.labels(metricsLabels);
 
         userExceptions = Gauge.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + "user_exception")
@@ -300,22 +232,8 @@ public class FunctionStatsManager implements AutoCloseable 
{
                 .help("Exception from sink.")
                 .register(collectorRegistry);
 
-        scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new 
Runnable() {
-            @Override
-            public void run() {
-                try {
-                    reset();
-                } catch (Exception e) {
-                    log.error("Failed to reset metrics for 1min window", e);
-                }
-            }
-        }, 1, 1, TimeUnit.MINUTES);
-
         userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 
5, 1, TimeUnit.MINUTES);
         sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 
1, TimeUnit.MINUTES);
-        sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 
5, 1, TimeUnit.MINUTES);
-        sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 
5, 1, TimeUnit.MINUTES);
-
     }
 
     public void addUserException(Exception ex) {
@@ -346,87 +264,54 @@ public class FunctionStatsManager implements 
AutoCloseable {
         }
     }
 
-    public void addSourceException(Throwable ex) {
-        long ts = System.currentTimeMillis();
-        InstanceCommunication.FunctionStatus.ExceptionInformation info = 
getExceptionInfo(ex, ts);
-        latestSourceExceptions.add(info);
-
-        // report exception throw prometheus
-        if (sourceExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, 
metricsLabels.length + 2);
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = 
ex.getMessage() != null ? ex.getMessage() : "";
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = 
String.valueOf(ts);
-            sourceExceptions.labels(exceptionMetricsLabels).set(1.0);
-        }
-    }
-
-    public void addSinkException(Throwable ex) {
-        long ts = System.currentTimeMillis();
-        InstanceCommunication.FunctionStatus.ExceptionInformation info = 
getExceptionInfo(ex, ts);
-        latestSinkExceptions.add(info);
-
-        // report exception throw prometheus
-        if (sinkExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, 
metricsLabels.length + 2);
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = 
ex.getMessage() != null ? ex.getMessage() : "";
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = 
String.valueOf(ts);
-            sinkExceptions.labels(exceptionMetricsLabels).set(1.0);
-        }
-    }
-
-    private InstanceCommunication.FunctionStatus.ExceptionInformation 
getExceptionInfo(Throwable th, long ts) {
-        InstanceCommunication.FunctionStatus.ExceptionInformation.Builder 
exceptionInfoBuilder =
-                
InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder().setMsSinceEpoch(ts);
-        String msg = th.getMessage();
-        if (msg != null) {
-            exceptionInfoBuilder.setExceptionString(msg);
-        }
-        return exceptionInfoBuilder.build();
-    }
-
+    @Override
     public void incrTotalReceived() {
-        _statTotalRecordsRecieved.inc();
-        _statTotalRecordsRecieved1min.inc();
+        _statTotalRecordsReceived.inc();
+        _statTotalRecordsReceived1min.inc();
     }
 
+    @Override
     public void incrTotalProcessedSuccessfully() {
         _statTotalProcessedSuccessfully.inc();
         _statTotalProcessedSuccessfully1min.inc();
     }
 
+    @Override
     public void incrSysExceptions(Throwable sysException) {
         _statTotalSysExceptions.inc();
         _statTotalSysExceptions1min.inc();
         addSystemException(sysException);
     }
 
+    @Override
     public void incrUserExceptions(Exception userException) {
         _statTotalUserExceptions.inc();
         _statTotalUserExceptions1min.inc();
         addUserException(userException);
     }
 
-    public void incrSourceExceptions(Exception userException) {
-        _statTotalSourceExceptions.inc();
-        _statTotalSourceExceptions1min.inc();
-        addSourceException(userException);
+    @Override
+    public void incrSourceExceptions(Exception ex) {
+        incrSysExceptions(ex);
     }
 
-    public void incrSinkExceptions(Exception userException) {
-        _statTotalSinkExceptions.inc();
-        _statTotalSinkExceptions1min.inc();
-        addSinkException(userException);
+    @Override
+    public void incrSinkExceptions(Exception ex) {
+        incrSysExceptions(ex);
     }
 
+    @Override
     public void setLastInvocation(long ts) {
         _statlastInvocation.set(ts);
     }
 
     private Long processTimeStart;
+    @Override
     public void processTimeStart() {
         processTimeStart = System.nanoTime();
     }
 
+    @Override
     public void processTimeEnd() {
         if (processTimeStart != null) {
             double endTimeMs = ((double) System.nanoTime() - processTimeStart) 
/ 1.0E6D;
@@ -435,30 +320,27 @@ public class FunctionStatsManager implements 
AutoCloseable {
         }
     }
 
+    @Override
     public double getTotalProcessedSuccessfully() {
         return _statTotalProcessedSuccessfully.get();
     }
 
+    @Override
     public double getTotalRecordsReceived() {
-        return _statTotalRecordsRecieved.get();
+        return _statTotalRecordsReceived.get();
     }
 
+    @Override
     public double getTotalSysExceptions() {
         return _statTotalSysExceptions.get();
     }
 
+    @Override
     public double getTotalUserExceptions() {
         return _statTotalUserExceptions.get();
     }
-
-    public double getTotalSourceExceptions() {
-        return _statTotalSourceExceptions.get();
-    }
-
-    public double getTotalSinkExceptions() {
-        return _statTotalSinkExceptions.get();
-    }
-
+    
+    @Override
     public double getLastInvocation() {
         return _statlastInvocation.get();
     }
@@ -484,33 +366,40 @@ public class FunctionStatsManager implements 
AutoCloseable {
         return _statProcessLatency.get().quantiles.get(0.999);
     }
 
+    @Override
     public double getTotalProcessedSuccessfully1min() {
         return _statTotalProcessedSuccessfully1min.get();
     }
 
+    @Override
     public double getTotalRecordsReceived1min() {
-        return _statTotalRecordsRecieved1min.get();
+        return _statTotalRecordsReceived1min.get();
     }
 
+    @Override
     public double getTotalSysExceptions1min() {
         return _statTotalSysExceptions1min.get();
     }
 
+    @Override
     public double getTotalUserExceptions1min() {
         return _statTotalUserExceptions1min.get();
     }
-
-    public double getTotalSourceExceptions1min() {
-        return _statTotalSourceExceptions1min.get();
+    
+    @Override
+    public double getAvgProcessLatency1min() {
+        return _statProcessLatency1min.get().count <= 0.0
+                ? 0 : _statProcessLatency1min.get().sum / 
_statProcessLatency1min.get().count;
     }
 
-    public double getTotalSinkExceptions1min() {
-        return _statTotalSinkExceptions1min.get();
+    @Override
+    public 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
getLatestSourceExceptions() {
+        return EMPTY_QUEUE;
     }
 
-    public double getAvgProcessLatency1min() {
-        return _statProcessLatency1min.get().count <= 0.0
-                ? 0 : _statProcessLatency1min.get().sum / 
_statProcessLatency1min.get().count;
+    @Override
+    public 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
getLatestSinkExceptions() {
+        return EMPTY_QUEUE;
     }
 
     public double getProcessLatency50P1min() {
@@ -529,6 +418,7 @@ public class FunctionStatsManager implements AutoCloseable {
         return _statProcessLatency1min.get().quantiles.get(0.999);
     }
 
+    @Override
     public void reset() {
         statTotalProcessedSuccessfully1min.clear();
         _statTotalProcessedSuccessfully1min = 
statTotalProcessedSuccessfully1min.labels(metricsLabels);
@@ -539,37 +429,13 @@ public class FunctionStatsManager implements 
AutoCloseable {
         statTotalUserExceptions1min.clear();
         _statTotalUserExceptions1min = 
statTotalUserExceptions1min.labels(metricsLabels);
 
-        statTotalSourceExceptions1min.clear();
-        _statTotalSourceExceptions1min = 
statTotalSourceExceptions1min.labels(metricsLabels);
-
-        statTotalSinkExceptions1min.clear();
-        _statTotalSinkExceptions1min = 
statTotalSinkExceptions1min.labels(metricsLabels);
-
         statProcessLatency1min.clear();
         _statProcessLatency1min = statProcessLatency1min.labels(metricsLabels);
 
-        statTotalRecordsRecieved1min.clear();
-        _statTotalRecordsRecieved1min = 
statTotalRecordsRecieved1min.labels(metricsLabels);
+        statTotalRecordsReceived1min.clear();
+        _statTotalRecordsReceived1min = 
statTotalRecordsReceived1min.labels(metricsLabels);
 
         latestUserExceptions.clear();
         latestSystemExceptions.clear();
-        latestSourceExceptions.clear();
-        latestSinkExceptions.clear();
-    }
-
-    public String getStatsAsString() throws IOException {
-        StringWriter outputWriter = new StringWriter();
-
-        TextFormat.write004(outputWriter, 
collectorRegistry.metricFamilySamples());
-
-        return outputWriter.toString();
-    }
-
-    @Override
-    public void close() {
-        if (scheduledFuture != null) {
-            scheduledFuture.cancel(false);
-            scheduledFuture = null;
-        }
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
new file mode 100644
index 0000000..14e4c21
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.stats;
+
+import com.google.common.collect.EvictingQueue;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import lombok.Getter;
+import org.apache.pulsar.common.util.RateLimiter;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+
+import java.util.Arrays;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class SinkStatsManager extends ComponentStatsManager {
+
+    public static final String PULSAR_SINK_METRICS_PREFIX = "pulsar_sink_";
+
+    /** Declare metric names **/
+    public static final String SYSTEM_EXCEPTIONS_TOTAL = 
"system_exceptions_total";
+    public static final String SINK_EXCEPTIONS_TOTAL = "sink_exceptions_total";
+    public static final String LAST_INVOCATION = "last_invocation";
+    public static final String RECEIVED_TOTAL = "received_total";
+    public static final String WRITTEN_TOTAL = "written_total";
+
+    public static final String SYSTEM_EXCEPTIONS_TOTAL_1min = 
"system_exceptions_total_1min";
+    public static final String SINK_EXCEPTIONS_TOTAL_1min = 
"sink_exceptions_total_1min";
+    public static final String RECEIVED_TOTAL_1min = "received_total_1min";
+    public static final String WRITTEN_TOTAL_1min = "written_total_1min";
+
+    /** Declare Prometheus stats **/
+
+    private final Counter statTotalRecordsReceived;
+
+    private final Counter statTotalSysExceptions;
+
+    private final Counter statTotalSinkExceptions;
+
+    private final Counter statTotalWritten;
+
+    private final Gauge statlastInvocation;
+
+    // windowed metrics
+    private final Counter statTotalRecordsReceived1min;
+
+    private final Counter statTotalSysExceptions1min;
+
+    private final Counter statTotalSinkExceptions1min;
+
+    private final Counter statTotalWritten1min;
+
+    // exceptions
+
+    final Gauge sysExceptions;
+
+    final Gauge sinkExceptions;
+
+    // As an optimization
+    private final Counter.Child _statTotalRecordsReceived;
+    private final Counter.Child _statTotalSysExceptions;
+    private final Counter.Child _statTotalSinkExceptions;
+    private final Counter.Child _statTotalWritten;
+    private final Gauge.Child _statlastInvocation;
+
+    private Counter.Child _statTotalRecordsReceived1min;
+    private Counter.Child _statTotalSysExceptions1min;
+    private Counter.Child _statTotalSinkExceptions1min;
+    private Counter.Child _statTotalWritten1min;
+
+    @Getter
+    private 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
latestSystemExceptions = EvictingQueue.create(10);
+    @Getter
+    private 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
latestSinkExceptions = EvictingQueue.create(10);
+
+    private final RateLimiter sysExceptionRateLimiter;
+
+    private final RateLimiter sinkExceptionRateLimiter;
+
+
+    public SinkStatsManager(CollectorRegistry collectorRegistry, String[] 
metricsLabels, ScheduledExecutorService
+            scheduledExecutorService) {
+        super(collectorRegistry, metricsLabels, scheduledExecutorService);
+
+        statTotalRecordsReceived = Counter.build()
+                .name(PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL)
+                .help("Total number of records sink has received from Pulsar 
topic(s).")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+        _statTotalRecordsReceived = 
statTotalRecordsReceived.labels(metricsLabels);
+
+        statTotalSysExceptions = Counter.build()
+                .name(PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
+                .help("Total number of system exceptions.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+        _statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels);
+
+        statTotalSinkExceptions = Counter.build()
+                .name(PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL)
+                .help("Total number of sink exceptions.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+        _statTotalSinkExceptions = 
statTotalSinkExceptions.labels(metricsLabels);
+
+        statTotalWritten = Counter.build()
+                .name(PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL)
+                .help("Total number of records processed by sink.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+        _statTotalWritten = statTotalWritten.labels(metricsLabels);
+
+        statlastInvocation = Gauge.build()
+                .name(PULSAR_SINK_METRICS_PREFIX + LAST_INVOCATION)
+                .help("The timestamp of the last invocation of the sink.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+        _statlastInvocation = statlastInvocation.labels(metricsLabels);
+
+        statTotalRecordsReceived1min = Counter.build()
+                .name(PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL_1min)
+                .help("Total number of messages sink has received from Pulsar 
topic(s) in the last 1 minute.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+        _statTotalRecordsReceived1min = 
statTotalRecordsReceived1min.labels(metricsLabels);
+
+        statTotalSysExceptions1min = Counter.build()
+                .name(PULSAR_SINK_METRICS_PREFIX + 
SYSTEM_EXCEPTIONS_TOTAL_1min)
+                .help("Total number of system exceptions in the last 1 
minute.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+        _statTotalSysExceptions1min = 
statTotalSysExceptions1min.labels(metricsLabels);
+
+        statTotalSinkExceptions1min = Counter.build()
+                .name(PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL_1min)
+                .help("Total number of sink exceptions in the last 1 minute.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+        _statTotalSinkExceptions1min = 
statTotalSinkExceptions1min.labels(metricsLabels);
+
+        statTotalWritten1min = Counter.build()
+                .name(PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL_1min)
+                .help("Total number of records processed by sink the last 1 
minute.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+        _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels);
+
+        sysExceptions = Gauge.build()
+                .name(PULSAR_SINK_METRICS_PREFIX + "system_exception")
+                .labelNames(exceptionMetricsLabelNames)
+                .help("Exception from system code.")
+                .register(collectorRegistry);
+
+        sinkExceptions = Gauge.build()
+                .name(PULSAR_SINK_METRICS_PREFIX + "sink_exception")
+                .labelNames(exceptionMetricsLabelNames)
+                .help("Exception from sink.")
+                .register(collectorRegistry);
+
+        sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 
1, TimeUnit.MINUTES);
+        sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 
5, 1, TimeUnit.MINUTES);
+    }
+
+    @Override
+    public void reset() {
+        statTotalRecordsReceived1min.clear();
+        _statTotalRecordsReceived1min = 
statTotalRecordsReceived1min.labels(metricsLabels);
+
+        statTotalSysExceptions1min.clear();
+        _statTotalSysExceptions1min = 
statTotalSysExceptions1min.labels(metricsLabels);
+
+        statTotalSinkExceptions1min.clear();
+        _statTotalSinkExceptions1min = 
statTotalSinkExceptions1min.labels(metricsLabels);
+
+        statTotalWritten1min.clear();
+        _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels);
+
+        latestSystemExceptions.clear();
+        latestSinkExceptions.clear();
+    }
+
+    @Override
+    public void incrTotalReceived() {
+        _statTotalRecordsReceived.inc();
+        _statTotalRecordsReceived1min.inc();
+    }
+
+    @Override
+    public void incrTotalProcessedSuccessfully() {
+        _statTotalWritten.inc();
+        _statTotalWritten1min.inc();
+    }
+
+    @Override
+    public void incrSysExceptions(Throwable ex) {
+        long ts = System.currentTimeMillis();
+        InstanceCommunication.FunctionStatus.ExceptionInformation info = 
getExceptionInfo(ex, ts);
+        latestSystemExceptions.add(info);
+
+        // report exception throw prometheus
+        if (sysExceptionRateLimiter.tryAcquire()) {
+            String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, 
metricsLabels.length + 2);
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = 
ex.getMessage() != null ? ex.getMessage() : "";
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = 
String.valueOf(ts);
+            sysExceptions.labels(exceptionMetricsLabels).set(1.0);
+        }
+    }
+
+    @Override
+    public void incrUserExceptions(Exception ex) {
+        incrSysExceptions(ex);
+    }
+
+    @Override
+    public void incrSourceExceptions(Exception ex) {
+        incrSysExceptions(ex);
+    }
+
+    @Override
+    public void incrSinkExceptions(Exception ex) {
+        long ts = System.currentTimeMillis();
+        InstanceCommunication.FunctionStatus.ExceptionInformation info = 
getExceptionInfo(ex, ts);
+        latestSinkExceptions.add(info);
+
+        // report exception throw prometheus
+        if (sinkExceptionRateLimiter.tryAcquire()) {
+            String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, 
metricsLabels.length + 2);
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = 
ex.getMessage() != null ? ex.getMessage() : "";
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = 
String.valueOf(ts);
+            sinkExceptions.labels(exceptionMetricsLabels).set(1.0);
+        }
+    }
+
+    @Override
+    public void setLastInvocation(long ts) {
+        _statlastInvocation.set(ts);
+    }
+
+    @Override
+    public void processTimeStart() {
+        //no-p[
+    }
+
+    @Override
+    public void processTimeEnd() {
+        //no-op
+    }
+
+    @Override
+    public double getTotalProcessedSuccessfully() {
+        return _statTotalWritten.get();
+    }
+
+    @Override
+    public double getTotalRecordsReceived() {
+        return _statTotalRecordsReceived.get();
+    }
+
+    @Override
+    public double getTotalSysExceptions() {
+        return _statTotalSysExceptions.get();
+    }
+
+    @Override
+    public double getTotalUserExceptions() {
+        return 0;
+    }
+
+    @Override
+    public double getLastInvocation() {
+        return _statlastInvocation.get();
+    }
+
+    @Override
+    public double getAvgProcessLatency() {
+        return 0;
+    }
+
+    @Override
+    public double getTotalProcessedSuccessfully1min() {
+        return _statTotalWritten1min.get();
+    }
+
+    @Override
+    public double getTotalRecordsReceived1min() {
+        return _statTotalRecordsReceived1min.get();
+    }
+
+    @Override
+    public double getTotalSysExceptions1min() {
+        return _statTotalSysExceptions1min.get();
+    }
+
+    @Override
+    public double getTotalUserExceptions1min() {
+        return 0;
+    }
+
+    @Override
+    public double getAvgProcessLatency1min() {
+        return 0;
+    }
+
+    @Override
+    public 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
getLatestUserExceptions() {
+        return EMPTY_QUEUE;
+    }
+
+    @Override
+    public 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
getLatestSystemExceptions() {
+        return latestSystemExceptions;
+    }
+
+    @Override
+    public 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
getLatestSourceExceptions() {
+        return EMPTY_QUEUE;
+    }
+
+    @Override
+    public 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
getLatestSinkExceptions() {
+        return latestSinkExceptions;
+    }
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
new file mode 100644
index 0000000..5679f2e
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.stats;
+
+import com.google.common.collect.EvictingQueue;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import lombok.Getter;
+import org.apache.pulsar.common.util.RateLimiter;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+
+import java.util.Arrays;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class SourceStatsManager extends ComponentStatsManager {
+
+    public static final String PULSAR_SOURCE_METRICS_PREFIX = "pulsar_source_";
+
+    /** Declare metric names **/
+    public static final String SYSTEM_EXCEPTIONS_TOTAL = 
"system_exceptions_total";
+    public static final String SOURCE_EXCEPTIONS_TOTAL = 
"source_exceptions_total";
+    public static final String LAST_INVOCATION = "last_invocation";
+    public static final String RECEIVED_TOTAL = "received_total";
+    public static final String WRITTEN_TOTAL = "written_total";
+
+    public static final String SYSTEM_EXCEPTIONS_TOTAL_1min = 
"system_exceptions_total_1min";
+    public static final String SOURCE_EXCEPTIONS_TOTAL_1min = 
"source_exceptions_total_1min";
+    public static final String RECEIVED_TOTAL_1min = "received_total_1min";
+    public static final String WRITTEN_TOTAL_1min = "written_total_1min";
+
+    /** Declare Prometheus stats **/
+
+    private final Counter statTotalRecordsReceived;
+
+    private final Counter statTotalSysExceptions;
+
+    private final Counter statTotalSourceExceptions;
+
+    private final Counter statTotalWritten;
+
+    private final Gauge statlastInvocation;
+
+    // windowed metrics
+    private final Counter statTotalRecordsReceived1min;
+
+    private final Counter statTotalSysExceptions1min;
+
+    private final Counter statTotalSourceExceptions1min;
+
+    private final Counter statTotalWritten1min;
+
+    // exceptions
+
+    final Gauge sysExceptions;
+
+    final Gauge sourceExceptions;
+
+    // As an optimization
+    private final Counter.Child _statTotalRecordsReceived;
+    private final Counter.Child _statTotalSysExceptions;
+    private final Counter.Child _statTotalSourceExceptions;
+    private final Counter.Child _statTotalWritten;
+    private final Gauge.Child _statlastInvocation;
+
+    private Counter.Child _statTotalRecordsReceived1min;
+    private Counter.Child _statTotalSysExceptions1min;
+    private Counter.Child _statTotalSourceExceptions1min;
+    private Counter.Child _statTotalWritten1min;
+
+    @Getter
+    private 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
latestSystemExceptions = EvictingQueue.create(10);
+    @Getter
+    private 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
latestSourceExceptions = EvictingQueue.create(10);
+
+    protected final RateLimiter sysExceptionRateLimiter;
+
+    protected final RateLimiter sourceExceptionRateLimiter;
+
+    public SourceStatsManager(CollectorRegistry collectorRegistry, String[] 
metricsLabels, ScheduledExecutorService
+            scheduledExecutorService) {
+        super(collectorRegistry, metricsLabels, scheduledExecutorService);
+
+        statTotalRecordsReceived = Counter.build()
+                .name(PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL)
+                .help("Total number of records received from source.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+        _statTotalRecordsReceived = 
statTotalRecordsReceived.labels(metricsLabels);
+
+        statTotalSysExceptions = Counter.build()
+                .name(PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
+                .help("Total number of system exceptions.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+        _statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels);
+
+        statTotalSourceExceptions = Counter.build()
+                .name(PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL)
+                .help("Total number of source exceptions.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+        _statTotalSourceExceptions = 
statTotalSourceExceptions.labels(metricsLabels);
+
+        statTotalWritten = Counter.build()
+                .name(PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL)
+                .help("Total number of records written to a Pulsar topic.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+        _statTotalWritten = statTotalWritten.labels(metricsLabels);
+
+        statlastInvocation = Gauge.build()
+                .name(PULSAR_SOURCE_METRICS_PREFIX + LAST_INVOCATION)
+                .help("The timestamp of the last invocation of the source.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+        _statlastInvocation = statlastInvocation.labels(metricsLabels);
+
+        statTotalRecordsReceived1min = Counter.build()
+                .name(PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL_1min)
+                .help("Total number of records received from source in the 
last 1 minute.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+        _statTotalRecordsReceived1min = 
statTotalRecordsReceived1min.labels(metricsLabels);
+
+        statTotalSysExceptions1min = Counter.build()
+                .name(PULSAR_SOURCE_METRICS_PREFIX + 
SYSTEM_EXCEPTIONS_TOTAL_1min)
+                .help("Total number of system exceptions in the last 1 
minute.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+        _statTotalSysExceptions1min = 
statTotalSysExceptions1min.labels(metricsLabels);
+
+        statTotalSourceExceptions1min = Counter.build()
+                .name(PULSAR_SOURCE_METRICS_PREFIX + 
SOURCE_EXCEPTIONS_TOTAL_1min)
+                .help("Total number of source exceptions in the last 1 
minute.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+        _statTotalSourceExceptions1min = 
statTotalSourceExceptions1min.labels(metricsLabels);
+
+        statTotalWritten1min = Counter.build()
+                .name(PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL_1min)
+                .help("Total number of records written to a Pulsar topic in 
the last 1 minute.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+        _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels);
+
+        sysExceptions = Gauge.build()
+                .name(PULSAR_SOURCE_METRICS_PREFIX + "system_exception")
+                .labelNames(exceptionMetricsLabelNames)
+                .help("Exception from system code.")
+                .register(collectorRegistry);
+
+        sourceExceptions = Gauge.build()
+                .name(PULSAR_SOURCE_METRICS_PREFIX + "source_exception")
+                .labelNames(exceptionMetricsLabelNames)
+                .help("Exception from source.")
+                .register(collectorRegistry);
+
+        sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 
1, TimeUnit.MINUTES);
+        sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 
5, 1, TimeUnit.MINUTES);
+    }
+
+    @Override
+    public void reset() {
+        statTotalRecordsReceived1min.clear();
+        _statTotalRecordsReceived1min = 
statTotalRecordsReceived1min.labels(metricsLabels);
+
+        statTotalSysExceptions1min.clear();
+        _statTotalSysExceptions1min = 
statTotalSysExceptions1min.labels(metricsLabels);
+
+        statTotalSourceExceptions1min.clear();
+        _statTotalSourceExceptions1min = 
statTotalSourceExceptions1min.labels(metricsLabels);
+
+        statTotalWritten1min.clear();
+        _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels);
+
+        latestSystemExceptions.clear();
+        latestSourceExceptions.clear();
+    }
+
+    @Override
+    public void incrTotalReceived() {
+        _statTotalRecordsReceived.inc();
+        _statTotalRecordsReceived1min.inc();
+    }
+
+    @Override
+    public void incrTotalProcessedSuccessfully() {
+        _statTotalWritten.inc();
+        _statTotalWritten1min.inc();
+    }
+
+    @Override
+    public void incrSysExceptions(Throwable ex) {
+        long ts = System.currentTimeMillis();
+        InstanceCommunication.FunctionStatus.ExceptionInformation info = 
getExceptionInfo(ex, ts);
+        latestSystemExceptions.add(info);
+
+        // report exception throw prometheus
+        if (sysExceptionRateLimiter.tryAcquire()) {
+            String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, 
metricsLabels.length + 2);
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = 
ex.getMessage() != null ? ex.getMessage() : "";
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = 
String.valueOf(ts);
+            sysExceptions.labels(exceptionMetricsLabels).set(1.0);
+        }
+    }
+
+    @Override
+    public void incrUserExceptions(Exception ex) {
+        incrSysExceptions(ex);
+    }
+
+    @Override
+    public void incrSourceExceptions(Exception ex) {
+        long ts = System.currentTimeMillis();
+        InstanceCommunication.FunctionStatus.ExceptionInformation info = 
getExceptionInfo(ex, ts);
+        latestSourceExceptions.add(info);
+
+        // report exception throw prometheus
+        if (sourceExceptionRateLimiter.tryAcquire()) {
+            String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, 
metricsLabels.length + 2);
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = 
ex.getMessage() != null ? ex.getMessage() : "";
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = 
String.valueOf(ts);
+            sourceExceptions.labels(exceptionMetricsLabels).set(1.0);
+        }
+    }
+
+    @Override
+    public void incrSinkExceptions(Exception ex) {
+        incrSysExceptions(ex);
+    }
+
+    @Override
+    public void setLastInvocation(long ts) {
+        _statlastInvocation.set(ts);
+    }
+
+    @Override
+    public void processTimeStart() {
+        //no-op
+    }
+
+    @Override
+    public void processTimeEnd() {
+        //no-op
+    }
+
+    @Override
+    public double getTotalProcessedSuccessfully() {
+        return _statTotalWritten.get();
+    }
+
+    @Override
+    public double getTotalRecordsReceived() {
+        return _statTotalRecordsReceived.get();
+    }
+
+    @Override
+    public double getTotalSysExceptions() {
+        return _statTotalSysExceptions.get();
+    }
+
+    @Override
+    public double getTotalUserExceptions() {
+        return 0;
+    }
+
+    @Override
+    public double getLastInvocation() {
+        return _statlastInvocation.get();
+    }
+
+    @Override
+    public double getAvgProcessLatency() {
+        return 0;
+    }
+
+    @Override
+    public double getTotalProcessedSuccessfully1min() {
+        return _statTotalWritten1min.get();
+    }
+
+    @Override
+    public double getTotalRecordsReceived1min() {
+        return _statTotalRecordsReceived1min.get();
+    }
+
+    @Override
+    public double getTotalSysExceptions1min() {
+        return _statTotalSysExceptions1min.get();
+    }
+
+    @Override
+    public double getTotalUserExceptions1min() {
+        return 0;
+    }
+
+    @Override
+    public double getAvgProcessLatency1min() {
+        return 0;
+    }
+
+    @Override
+    public 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
getLatestUserExceptions() {
+        return EvictingQueue.create(0);
+    }
+
+    @Override
+    public 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
getLatestSystemExceptions() {
+        return EMPTY_QUEUE;
+    }
+
+    @Override
+    public 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
getLatestSourceExceptions() {
+        return EMPTY_QUEUE;
+    }
+
+    @Override
+    public 
EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> 
getLatestSinkExceptions() {
+        return EvictingQueue.create(0);
+    }
+}
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 77427f7..278cd5a 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -39,6 +39,7 @@ import 
org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import 
org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
+import org.apache.pulsar.functions.utils.Utils;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Matchers;
@@ -75,8 +76,8 @@ public class ContextImplTest {
             logger,
             client,
             new ArrayList<>(),
-            new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), 
new String[0]
-        );
+            new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), 
new String[0],
+                Utils.ComponentType.FUNCTION);
     }
 
     @Test(expected = IllegalStateException.class)
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 1c5d64d..b50bcd5 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -364,4 +364,21 @@ public class Utils {
                                                      String functionName, int 
instanceId) {
         return String.format("%s/%s/%s:%d", tenant, namespace, functionName, 
instanceId);
     }
+
+    public enum ComponentType {
+        FUNCTION("Function"),
+        SOURCE("Source"),
+        SINK("Sink");
+
+        private final String componentName;
+
+        ComponentType(String componentName) {
+            this.componentName = componentName;
+        }
+
+        @Override
+        public String toString() {
+            return componentName;
+        }
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 0e4b1b1..50afdc6 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -72,9 +72,9 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.join;
 import static org.apache.pulsar.functions.utils.Utils.*;
-import static 
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.FUNCTION;
-import static 
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.SINK;
-import static 
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.SOURCE;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -122,23 +122,6 @@ import net.jodah.typetools.TypeResolver;
 @Slf4j
 public abstract class ComponentImpl {
 
-    public enum ComponentType {
-        FUNCTION("Function"),
-        SOURCE("Source"),
-        SINK("Sink");
-
-        private final String componentName;
-
-        ComponentType(String componentName) {
-            this.componentName = componentName;
-        }
-
-        @Override
-        public String toString() {
-            return componentName;
-        }
-    }
-
     private final AtomicReference<StorageClient> storageClient = new 
AtomicReference<>();
     protected final Supplier<WorkerService> workerServiceSupplier;
     protected final ComponentType componentType;
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index f3e41a1..be8fd49 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -24,6 +24,7 @@ import 
org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.RestException;
 
@@ -199,7 +200,7 @@ public class FunctionsImpl extends ComponentImpl {
     }
 
     public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
-        super(workerServiceSupplier, ComponentType.FUNCTION);
+        super(workerServiceSupplier, Utils.ComponentType.FUNCTION);
     }
 
     /**
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index 846900f..4a801b2 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
+import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.RestException;
@@ -206,7 +207,7 @@ public class SinkImpl extends ComponentImpl {
     }
 
     public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
-        super(workerServiceSupplier, ComponentType.SINK);
+        super(workerServiceSupplier, Utils.ComponentType.SINK);
     }
 
     public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
getSinkInstanceStatus(final String tenant,
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index ac096bb..ec7ba30 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.common.policies.data.SourceStatus;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
+import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.RestException;
@@ -204,7 +205,7 @@ public class SourceImpl extends ComponentImpl {
     }
 
     public SourceImpl(Supplier<WorkerService> workerServiceSupplier) {
-        super(workerServiceSupplier, ComponentType.SOURCE);
+        super(workerServiceSupplier, Utils.ComponentType.SOURCE);
     }
 
     public SourceStatus getSourceStatus(final String tenant,
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index aa22650..d8d54c9 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -76,6 +76,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
 import static org.apache.pulsar.functions.utils.Utils.mergeJson;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
@@ -179,7 +180,7 @@ public class FunctionApiV2ResourceTest {
 
         FunctionsImpl functions = spy(new FunctionsImpl(() -> 
mockedWorkerService));
 
-        
doReturn(ComponentImpl.ComponentType.FUNCTION).when(functions).calculateSubjectType(any());
+        doReturn(FUNCTION).when(functions).calculateSubjectType(any());
 
         this.resource = spy(new FunctionsImplV2(functions));
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 580345e..7695993 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -76,6 +76,9 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK;
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
@@ -177,7 +180,7 @@ public class FunctionApiV3ResourceTest {
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
-        
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(any());
+        doReturn(FUNCTION).when(this.resource).calculateSubjectType(any());
     }
 
     //
@@ -1383,9 +1386,9 @@ public class FunctionApiV3ResourceTest {
                 
FunctionDetails.newBuilder().setName("test-3").build()).build();
         functionMetaDataList.add(f3);
         when(mockedManager.listFunctions(eq(tenant), 
eq(namespace))).thenReturn(functionMetaDataList);
-        
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-        
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-        
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+        doReturn(SOURCE).when(this.resource).calculateSubjectType(f1);
+        doReturn(FUNCTION).when(this.resource).calculateSubjectType(f2);
+        doReturn(SINK).when(this.resource).calculateSubjectType(f3);
 
         List<String> functionList = listDefaultFunctions();
         assertEquals(functions, functionList);
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index bec3ca9..9e55800 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -47,7 +47,6 @@ import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.RestException;
-import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
 import org.apache.pulsar.functions.worker.rest.api.SinkImpl;
 import org.apache.pulsar.io.cassandra.CassandraStringSink;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -175,7 +174,7 @@ public class SinkApiV3ResourceTest {
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new SinkImpl(() -> mockedWorkerService));
-        
Mockito.doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(any());
+        
Mockito.doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SINK).when(this.resource).calculateSubjectType(any());
     }
 
     //
@@ -1308,9 +1307,9 @@ public class SinkApiV3ResourceTest {
                 
FunctionDetails.newBuilder().setName("test-3").build()).build();
         functionMetaDataList.add(f3);
         when(mockedManager.listFunctions(eq(tenant), 
eq(namespace))).thenReturn(functionMetaDataList);
-        
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-        
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-        
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+        
doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+        
doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+        
doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
 
         List<String> sinkList = listDefaultSinks();
         assertEquals(functions, sinkList);
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 72d23e2..6ba8914 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -50,7 +50,6 @@ import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.RestException;
-import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
 import org.apache.pulsar.functions.worker.rest.api.SourceImpl;
 import org.apache.pulsar.io.twitter.TwitterFireHose;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -170,7 +169,7 @@ public class SourceApiV3ResourceTest {
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new SourceImpl(() -> mockedWorkerService));
-        
Mockito.doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(any());
+        
Mockito.doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE).when(this.resource).calculateSubjectType(any());
     }
 
     //
@@ -1322,9 +1321,9 @@ public class SourceApiV3ResourceTest {
                 
FunctionDetails.newBuilder().setName("test-3").build()).build();
         functionMetaDataList.add(f3);
         when(mockedManager.listFunctions(eq(tenant), 
eq(namespace))).thenReturn(functionMetaDataList);
-        
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-        
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-        
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+        
doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+        
doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+        
doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
 
         List<String> sourceList = listDefaultSources();
         assertEquals(functions, sourceList);
diff --git a/pulsar-io/data-genenator/pom.xml b/pulsar-io/data-generator/pom.xml
similarity index 100%
rename from pulsar-io/data-genenator/pom.xml
rename to pulsar-io/data-generator/pom.xml
diff --git 
a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java
 
b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java
similarity index 100%
rename from 
pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java
rename to 
pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java
diff --git 
a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
 
b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
similarity index 100%
rename from 
pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
rename to 
pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
diff --git 
a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java
 
b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java
similarity index 100%
rename from 
pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java
rename to 
pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java
diff --git 
a/pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/data-generator/src/main/resources/META-INF/services/pulsar-io.yaml
similarity index 100%
rename from 
pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml
rename to 
pulsar-io/data-generator/src/main/resources/META-INF/services/pulsar-io.yaml
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 628686b..ea433ce 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -42,7 +42,7 @@
     <module>kinesis</module>
     <module>hdfs3</module>
     <module>jdbc</module>
-    <module>data-genenator</module>
+    <module>data-generator</module>
     <module>elastic-search</module>
     <module>kafka-connect-adaptor</module>
     <module>debezium</module>

Reply via email to