This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f779b4  Add function metrics with function-stats to get metrics 
on-demand (#2130)
5f779b4 is described below

commit 5f779b4ce541e1201354ca5454bebf1276e93f0b
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Mon Jul 16 14:00:07 2018 -0700

    Add function metrics with function-stats to get metrics on-demand (#2130)
---
 .../org/apache/pulsar/io/PulsarSinkE2ETest.java    | 84 ++++++++++++++++++++--
 .../functions/instance/JavaInstanceRunnable.java   | 22 ++++--
 .../src/main/python/InstanceCommunication_pb2.py   | 44 +++++++-----
 .../instance/src/main/python/python_instance.py    |  1 +
 .../src/main/proto/InstanceCommunication.proto     |  1 +
 .../pulsar/functions/runtime/ThreadRuntime.java    | 13 +++-
 6 files changed, 133 insertions(+), 32 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 5d70525..3ece269 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -48,22 +48,26 @@ import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import 
org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
+import 
org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.DataDigest;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.WorkerServer;
@@ -244,6 +248,7 @@ public class PulsarSinkE2ETest {
         final String sinkTopic = "persistent://" + replNamespace + "/output";
         final String propertyKey = "key";
         final String propertyValue = "value";
+        final String functionName = "PulsarSink-test";
         admin.namespaces().createNamespace(replNamespace);
         Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
         admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
@@ -254,7 +259,7 @@ public class PulsarSinkE2ETest {
 
         String jarFilePathUrl = Utils.FILE + ":"
                 + 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, 
tenant, namespacePortion, "PulsarSink-test",
+        FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, 
tenant, namespacePortion, functionName,
                 sinkTopic);
         admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
 
@@ -298,7 +303,76 @@ public class PulsarSinkE2ETest {
 
     }
 
-    protected FunctionDetails createSinkConfig(String jarFile, String tenant, 
String namespace, String sinkName, String sinkTopic) {
+    
+    @Test(timeOut = 20000)
+    public void testPulsarSinkStats() throws Exception {
+
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sourceTopic = "persistent://" + replNamespace + 
"/my-topic1";
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String propertyKey = "key";
+        final String propertyValue = "value";
+        final String functionName = "PulsarSink-test";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
+
+        // create a producer that creates a topic at broker
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(sourceTopic).create();
+
+        String jarFilePathUrl = Utils.FILE + ":"
+                + 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, 
tenant, namespacePortion, functionName,
+                sinkTopic);
+        admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
+
+        // try to update function to test: update-function functionality
+        admin.functions().updateFunctionWithUrl(functionDetails, 
jarFilePathUrl);
+
+        retryStrategically((test) -> {
+            try {
+                return 
admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+        // validate pulsar sink consumer has started on the topic
+        
Assert.assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 
1);
+
+        int totalMsgs = 10;
+        for (int i = 0; i < totalMsgs; i++) {
+            String data = "my-message-" + i;
+            producer.newMessage().property(propertyKey, 
propertyValue).value(data.getBytes()).send();
+        }
+        retryStrategically((test) -> {
+            try {
+                SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.values().iterator()
+                        .next();
+                return subStats.unackedMessages == 0;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 500);
+
+        FunctionRuntimeManager functionRuntimeManager = 
functionsWorkerService.getFunctionRuntimeManager();
+        functionRuntimeManager.updateRates();
+        FunctionStatusList functionStats = 
functionRuntimeManager.getAllFunctionStatus(tenant, namespacePortion,
+                functionName);
+
+        int numInstances = functionStats.getFunctionStatusListCount();
+        Assert.assertEquals(numInstances, 1);
+
+        FunctionStatus stats = 
functionStats.getFunctionStatusListList().get(0);
+        Map<String, DataDigest> metricsData = 
stats.getMetrics().getMetricsMap();
+
+        double count = 
metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_PROCESSED).getCount();
+        double success = 
metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_SUCCESS).getCount();
+        Assert.assertEquals((int) count, totalMsgs);
+        Assert.assertEquals((int) success, totalMsgs);
+    }
+
+    protected FunctionDetails createSinkConfig(String jarFile, String tenant, 
String namespace, String functionName, String sinkTopic) {
 
         File file = new File(jarFile);
         try {
@@ -312,7 +386,7 @@ public class PulsarSinkE2ETest {
         FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
         functionDetailsBuilder.setTenant(tenant);
         functionDetailsBuilder.setNamespace(namespace);
-        functionDetailsBuilder.setName(sinkName);
+        functionDetailsBuilder.setName(functionName);
         functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
         functionDetailsBuilder.setParallelism(1);
         functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 4c29232..33006b2 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -108,6 +108,14 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
 
     private Source source;
     private Sink sink;
+    
+    public static final String METRICS_TOTAL_PROCESSED = "__total_processed__";
+    public static final String METRICS_TOTAL_SUCCESS = 
"__total_successfully_processed__";
+    public static final String METRICS_TOTAL_SYS_EXCEPTION = 
"__total_system_exceptions__";
+    public static final String METRICS_TOTAL_USER_EXCEPTION = 
"__total_user_exceptions__";
+    public static final String METRICS_TOTAL_DESERIALIZATION_EXCEPTION = 
"__total_deserialization_exceptions__";
+    public static final String METRICS_TOTAL_SERIALIZATION_EXCEPTION = 
"__total_serialization_exceptions__";
+    public static final String METRICS_AVG_LATENCY = "__avg_latency_ms__";
 
     public JavaInstanceRunnable(InstanceConfig instanceConfig,
                                 FunctionCacheManager fnCache,
@@ -418,17 +426,17 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
     
     private Builder createMetricsDataBuilder() {
         InstanceCommunication.MetricsData.Builder bldr = 
InstanceCommunication.MetricsData.newBuilder();
-        addSystemMetrics("__total_processed__", 
stats.getStats().getTotalProcessed(), bldr);
-        addSystemMetrics("__total_successfully_processed__", 
stats.getStats().getTotalSuccessfullyProcessed(),
+        addSystemMetrics(METRICS_TOTAL_PROCESSED, 
stats.getStats().getTotalProcessed(), bldr);
+        addSystemMetrics(METRICS_TOTAL_SUCCESS, 
stats.getStats().getTotalSuccessfullyProcessed(),
                 bldr);
-        addSystemMetrics("__total_system_exceptions__", 
stats.getStats().getTotalSystemExceptions(), bldr);
-        addSystemMetrics("__total_user_exceptions__", 
stats.getStats().getTotalUserExceptions(), bldr);
+        addSystemMetrics(METRICS_TOTAL_SYS_EXCEPTION, 
stats.getStats().getTotalSystemExceptions(), bldr);
+        addSystemMetrics(METRICS_TOTAL_USER_EXCEPTION, 
stats.getStats().getTotalUserExceptions(), bldr);
         stats.getStats().getTotalDeserializationExceptions().forEach((topic, 
count) -> {
-            addSystemMetrics("__total_deserialization_exceptions__" + topic, 
count, bldr);
+            addSystemMetrics(METRICS_TOTAL_DESERIALIZATION_EXCEPTION + topic, 
count, bldr);
         });
-        addSystemMetrics("__total_serialization_exceptions__",
+        addSystemMetrics(METRICS_TOTAL_SERIALIZATION_EXCEPTION,
                 stats.getStats().getTotalSerializationExceptions(), bldr);
-        addSystemMetrics("__avg_latency_ms__", 
stats.getStats().computeLatency(), bldr);
+        addSystemMetrics(METRICS_AVG_LATENCY, 
stats.getStats().computeLatency(), bldr);
         return bldr;
     }
 
diff --git 
a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py 
b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
index 49c77d6..a16399e 100644
--- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
+++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
@@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='InstanceCommunication.proto',
   package='proto',
   syntax='proto3',
-  
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xa1\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 
\x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 
\x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 
\n\x18numSuccessfullyProcessed\x18\x05 
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 
\x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 
\x03(\x0b\x32*.proto.FunctionStatus.Ex [...]
+  
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xc6\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 
\x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 
\x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 
\n\x18numSuccessfullyProcessed\x18\x05 
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 
\x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 
\x03(\x0b\x32*.proto.FunctionStatus.Ex [...]
   ,
   dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,])
 
@@ -79,8 +79,8 @@ _FUNCTIONSTATUS_EXCEPTIONINFORMATION = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=606,
-  serialized_end=675,
+  serialized_start=643,
+  serialized_end=712,
 )
 
 _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY = _descriptor.Descriptor(
@@ -116,8 +116,8 @@ _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=677,
-  serialized_end=741,
+  serialized_start=714,
+  serialized_end=778,
 )
 
 _FUNCTIONSTATUS = _descriptor.Descriptor(
@@ -225,6 +225,13 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='metrics', full_name='proto.FunctionStatus.metrics', index=14,
+      number=15, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
@@ -238,7 +245,7 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=68,
-  serialized_end=741,
+  serialized_end=778,
 )
 
 
@@ -268,8 +275,8 @@ _FUNCTIONSTATUSLIST = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=743,
-  serialized_end=814,
+  serialized_start=780,
+  serialized_end=851,
 )
 
 
@@ -320,8 +327,8 @@ _METRICSDATA_DATADIGEST = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=882,
-  serialized_end=948,
+  serialized_start=919,
+  serialized_end=985,
 )
 
 _METRICSDATA_METRICSENTRY = _descriptor.Descriptor(
@@ -357,8 +364,8 @@ _METRICSDATA_METRICSENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=950,
-  serialized_end=1027,
+  serialized_start=987,
+  serialized_end=1064,
 )
 
 _METRICSDATA = _descriptor.Descriptor(
@@ -387,8 +394,8 @@ _METRICSDATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=817,
-  serialized_end=1027,
+  serialized_start=854,
+  serialized_end=1064,
 )
 
 
@@ -418,8 +425,8 @@ _HEALTHCHECKRESULT = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1029,
-  serialized_end=1065,
+  serialized_start=1066,
+  serialized_end=1102,
 )
 
 _FUNCTIONSTATUS_EXCEPTIONINFORMATION.containing_type = _FUNCTIONSTATUS
@@ -427,6 +434,7 @@ 
_FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY.containing_type = _FUNCTIONSTATUS
 _FUNCTIONSTATUS.fields_by_name['latestUserExceptions'].message_type = 
_FUNCTIONSTATUS_EXCEPTIONINFORMATION
 _FUNCTIONSTATUS.fields_by_name['latestSystemExceptions'].message_type = 
_FUNCTIONSTATUS_EXCEPTIONINFORMATION
 _FUNCTIONSTATUS.fields_by_name['deserializationExceptions'].message_type = 
_FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY
+_FUNCTIONSTATUS.fields_by_name['metrics'].message_type = _METRICSDATA
 _FUNCTIONSTATUSLIST.fields_by_name['functionStatusList'].message_type = 
_FUNCTIONSTATUS
 _METRICSDATA_DATADIGEST.containing_type = _METRICSDATA
 _METRICSDATA_METRICSENTRY.fields_by_name['value'].message_type = 
_METRICSDATA_DATADIGEST
@@ -512,8 +520,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
   file=DESCRIPTOR,
   index=0,
   options=None,
-  serialized_start=1068,
-  serialized_end=1416,
+  serialized_start=1105,
+  serialized_end=1453,
   methods=[
   _descriptor.MethodDescriptor(
     name='GetFunctionStatus',
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index 70b78f0..bb16253 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -346,6 +346,7 @@ class PythonInstance(object):
     status.serializationExceptions = self.total_stats.nserialization_exceptions
     status.averageLatency = self.total_stats.compute_latency()
     status.lastInvocationTime = self.total_stats.lastinvocationtime
+    status.metrics.CopyFrom(self.get_metrics())
     return status
 
   def join(self):
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto 
b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index ed8c95a..1a80f5e 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -49,6 +49,7 @@ message FunctionStatus {
     // expressed in ms since epoch
     int64 lastInvocationTime = 13;
     string instanceId = 14;
+    MetricsData metrics = 15;
 }
 
 message FunctionStatusList {
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 3b53fb8..5cb8ce0 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -98,15 +98,24 @@ class ThreadRuntime implements Runtime {
 
     @Override
     public CompletableFuture<FunctionStatus> getFunctionStatus() {
+        CompletableFuture<FunctionStatus> statsFuture = new 
CompletableFuture<>();
         if (!isAlive()) {
             FunctionStatus.Builder functionStatusBuilder = 
FunctionStatus.newBuilder();
             functionStatusBuilder.setRunning(false);
             
functionStatusBuilder.setFailureException(getDeathException().getMessage());
-            return 
CompletableFuture.completedFuture(functionStatusBuilder.build());
+            statsFuture.complete(functionStatusBuilder.build());
+            return statsFuture;
         }
         FunctionStatus.Builder functionStatusBuilder = 
javaInstanceRunnable.getFunctionStatus();
         functionStatusBuilder.setRunning(true);
-        return 
CompletableFuture.completedFuture(functionStatusBuilder.build());
+        getMetrics().handle((metrics, e) -> {
+            if (e == null) {
+                functionStatusBuilder.setMetrics(metrics);
+            }
+            statsFuture.complete(functionStatusBuilder.build());
+            return null;
+        });
+        return statsFuture;
     }
 
     @Override

Reply via email to