This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5f779b4 Add function metrics with function-stats to get metrics on-demand (#2130) 5f779b4 is described below commit 5f779b4ce541e1201354ca5454bebf1276e93f0b Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Mon Jul 16 14:00:07 2018 -0700 Add function metrics with function-stats to get metrics on-demand (#2130) --- .../org/apache/pulsar/io/PulsarSinkE2ETest.java | 84 ++++++++++++++++++++-- .../functions/instance/JavaInstanceRunnable.java | 22 ++++-- .../src/main/python/InstanceCommunication_pb2.py | 44 +++++++----- .../instance/src/main/python/python_instance.py | 1 + .../src/main/proto/InstanceCommunication.proto | 1 + .../pulsar/functions/runtime/ThreadRuntime.java | 13 +++- 6 files changed, 133 insertions(+), 32 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 5d70525..3ece269 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -48,22 +48,26 @@ import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.api.utils.IdentityFunction; +import org.apache.pulsar.functions.instance.JavaInstanceRunnable; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.Function.SourceSpec; +import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; +import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; +import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData; +import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.DataDigest; import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.Utils; +import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.rest.WorkerServer; @@ -244,6 +248,7 @@ public class PulsarSinkE2ETest { final String sinkTopic = "persistent://" + replNamespace + "/output"; final String propertyKey = "key"; final String propertyValue = "value"; + final String functionName = "PulsarSink-test"; admin.namespaces().createNamespace(replNamespace); Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); @@ -254,7 +259,7 @@ public class PulsarSinkE2ETest { String jarFilePathUrl = Utils.FILE + ":" + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); - FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, "PulsarSink-test", + FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, sinkTopic); admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); @@ -298,7 +303,76 @@ public class PulsarSinkE2ETest { } - protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String sinkName, String sinkTopic) { + + @Test(timeOut = 20000) + public void testPulsarSinkStats() throws Exception { + + final String namespacePortion = "io"; + final String replNamespace = tenant + "/" + namespacePortion; + final String sourceTopic = "persistent://" + replNamespace + "/my-topic1"; + final String sinkTopic = "persistent://" + replNamespace + "/output"; + final String propertyKey = "key"; + final String propertyValue = "value"; + final String functionName = "PulsarSink-test"; + admin.namespaces().createNamespace(replNamespace); + Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + + // create a producer that creates a topic at broker + Producer<byte[]> producer = pulsarClient.newProducer().topic(sourceTopic).create(); + + String jarFilePathUrl = Utils.FILE + ":" + + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, + sinkTopic); + admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); + + // try to update function to test: update-function functionality + admin.functions().updateFunctionWithUrl(functionDetails, jarFilePathUrl); + + retryStrategically((test) -> { + try { + return admin.topics().getStats(sourceTopic).subscriptions.size() == 1; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + // validate pulsar sink consumer has started on the topic + Assert.assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1); + + int totalMsgs = 10; + for (int i = 0; i < totalMsgs; i++) { + String data = "my-message-" + i; + producer.newMessage().property(propertyKey, propertyValue).value(data.getBytes()).send(); + } + retryStrategically((test) -> { + try { + SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.values().iterator() + .next(); + return subStats.unackedMessages == 0; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 500); + + FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager(); + functionRuntimeManager.updateRates(); + FunctionStatusList functionStats = functionRuntimeManager.getAllFunctionStatus(tenant, namespacePortion, + functionName); + + int numInstances = functionStats.getFunctionStatusListCount(); + Assert.assertEquals(numInstances, 1); + + FunctionStatus stats = functionStats.getFunctionStatusListList().get(0); + Map<String, DataDigest> metricsData = stats.getMetrics().getMetricsMap(); + + double count = metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_PROCESSED).getCount(); + double success = metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_SUCCESS).getCount(); + Assert.assertEquals((int) count, totalMsgs); + Assert.assertEquals((int) success, totalMsgs); + } + + protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String functionName, String sinkTopic) { File file = new File(jarFile); try { @@ -312,7 +386,7 @@ public class PulsarSinkE2ETest { FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); functionDetailsBuilder.setTenant(tenant); functionDetailsBuilder.setNamespace(namespace); - functionDetailsBuilder.setName(sinkName); + functionDetailsBuilder.setName(functionName); functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA); functionDetailsBuilder.setParallelism(1); functionDetailsBuilder.setClassName(IdentityFunction.class.getName()); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 4c29232..33006b2 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -108,6 +108,14 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private Source source; private Sink sink; + + public static final String METRICS_TOTAL_PROCESSED = "__total_processed__"; + public static final String METRICS_TOTAL_SUCCESS = "__total_successfully_processed__"; + public static final String METRICS_TOTAL_SYS_EXCEPTION = "__total_system_exceptions__"; + public static final String METRICS_TOTAL_USER_EXCEPTION = "__total_user_exceptions__"; + public static final String METRICS_TOTAL_DESERIALIZATION_EXCEPTION = "__total_deserialization_exceptions__"; + public static final String METRICS_TOTAL_SERIALIZATION_EXCEPTION = "__total_serialization_exceptions__"; + public static final String METRICS_AVG_LATENCY = "__avg_latency_ms__"; public JavaInstanceRunnable(InstanceConfig instanceConfig, FunctionCacheManager fnCache, @@ -418,17 +426,17 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private Builder createMetricsDataBuilder() { InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder(); - addSystemMetrics("__total_processed__", stats.getStats().getTotalProcessed(), bldr); - addSystemMetrics("__total_successfully_processed__", stats.getStats().getTotalSuccessfullyProcessed(), + addSystemMetrics(METRICS_TOTAL_PROCESSED, stats.getStats().getTotalProcessed(), bldr); + addSystemMetrics(METRICS_TOTAL_SUCCESS, stats.getStats().getTotalSuccessfullyProcessed(), bldr); - addSystemMetrics("__total_system_exceptions__", stats.getStats().getTotalSystemExceptions(), bldr); - addSystemMetrics("__total_user_exceptions__", stats.getStats().getTotalUserExceptions(), bldr); + addSystemMetrics(METRICS_TOTAL_SYS_EXCEPTION, stats.getStats().getTotalSystemExceptions(), bldr); + addSystemMetrics(METRICS_TOTAL_USER_EXCEPTION, stats.getStats().getTotalUserExceptions(), bldr); stats.getStats().getTotalDeserializationExceptions().forEach((topic, count) -> { - addSystemMetrics("__total_deserialization_exceptions__" + topic, count, bldr); + addSystemMetrics(METRICS_TOTAL_DESERIALIZATION_EXCEPTION + topic, count, bldr); }); - addSystemMetrics("__total_serialization_exceptions__", + addSystemMetrics(METRICS_TOTAL_SERIALIZATION_EXCEPTION, stats.getStats().getTotalSerializationExceptions(), bldr); - addSystemMetrics("__avg_latency_ms__", stats.getStats().computeLatency(), bldr); + addSystemMetrics(METRICS_AVG_LATENCY, stats.getStats().computeLatency(), bldr); return bldr; } diff --git a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py index 49c77d6..a16399e 100644 --- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py +++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py @@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( name='InstanceCommunication.proto', package='proto', syntax='proto3', - serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xa1\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.Ex [...] + serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xc6\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.Ex [...] , dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,]) @@ -79,8 +79,8 @@ _FUNCTIONSTATUS_EXCEPTIONINFORMATION = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=606, - serialized_end=675, + serialized_start=643, + serialized_end=712, ) _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY = _descriptor.Descriptor( @@ -116,8 +116,8 @@ _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=677, - serialized_end=741, + serialized_start=714, + serialized_end=778, ) _FUNCTIONSTATUS = _descriptor.Descriptor( @@ -225,6 +225,13 @@ _FUNCTIONSTATUS = _descriptor.Descriptor( message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='metrics', full_name='proto.FunctionStatus.metrics', index=14, + number=15, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), ], extensions=[ ], @@ -238,7 +245,7 @@ _FUNCTIONSTATUS = _descriptor.Descriptor( oneofs=[ ], serialized_start=68, - serialized_end=741, + serialized_end=778, ) @@ -268,8 +275,8 @@ _FUNCTIONSTATUSLIST = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=743, - serialized_end=814, + serialized_start=780, + serialized_end=851, ) @@ -320,8 +327,8 @@ _METRICSDATA_DATADIGEST = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=882, - serialized_end=948, + serialized_start=919, + serialized_end=985, ) _METRICSDATA_METRICSENTRY = _descriptor.Descriptor( @@ -357,8 +364,8 @@ _METRICSDATA_METRICSENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=950, - serialized_end=1027, + serialized_start=987, + serialized_end=1064, ) _METRICSDATA = _descriptor.Descriptor( @@ -387,8 +394,8 @@ _METRICSDATA = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=817, - serialized_end=1027, + serialized_start=854, + serialized_end=1064, ) @@ -418,8 +425,8 @@ _HEALTHCHECKRESULT = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1029, - serialized_end=1065, + serialized_start=1066, + serialized_end=1102, ) _FUNCTIONSTATUS_EXCEPTIONINFORMATION.containing_type = _FUNCTIONSTATUS @@ -427,6 +434,7 @@ _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY.containing_type = _FUNCTIONSTATUS _FUNCTIONSTATUS.fields_by_name['latestUserExceptions'].message_type = _FUNCTIONSTATUS_EXCEPTIONINFORMATION _FUNCTIONSTATUS.fields_by_name['latestSystemExceptions'].message_type = _FUNCTIONSTATUS_EXCEPTIONINFORMATION _FUNCTIONSTATUS.fields_by_name['deserializationExceptions'].message_type = _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY +_FUNCTIONSTATUS.fields_by_name['metrics'].message_type = _METRICSDATA _FUNCTIONSTATUSLIST.fields_by_name['functionStatusList'].message_type = _FUNCTIONSTATUS _METRICSDATA_DATADIGEST.containing_type = _METRICSDATA _METRICSDATA_METRICSENTRY.fields_by_name['value'].message_type = _METRICSDATA_DATADIGEST @@ -512,8 +520,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor( file=DESCRIPTOR, index=0, options=None, - serialized_start=1068, - serialized_end=1416, + serialized_start=1105, + serialized_end=1453, methods=[ _descriptor.MethodDescriptor( name='GetFunctionStatus', diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 70b78f0..bb16253 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -346,6 +346,7 @@ class PythonInstance(object): status.serializationExceptions = self.total_stats.nserialization_exceptions status.averageLatency = self.total_stats.compute_latency() status.lastInvocationTime = self.total_stats.lastinvocationtime + status.metrics.CopyFrom(self.get_metrics()) return status def join(self): diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto index ed8c95a..1a80f5e 100644 --- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto +++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto @@ -49,6 +49,7 @@ message FunctionStatus { // expressed in ms since epoch int64 lastInvocationTime = 13; string instanceId = 14; + MetricsData metrics = 15; } message FunctionStatusList { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java index 3b53fb8..5cb8ce0 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java @@ -98,15 +98,24 @@ class ThreadRuntime implements Runtime { @Override public CompletableFuture<FunctionStatus> getFunctionStatus() { + CompletableFuture<FunctionStatus> statsFuture = new CompletableFuture<>(); if (!isAlive()) { FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder(); functionStatusBuilder.setRunning(false); functionStatusBuilder.setFailureException(getDeathException().getMessage()); - return CompletableFuture.completedFuture(functionStatusBuilder.build()); + statsFuture.complete(functionStatusBuilder.build()); + return statsFuture; } FunctionStatus.Builder functionStatusBuilder = javaInstanceRunnable.getFunctionStatus(); functionStatusBuilder.setRunning(true); - return CompletableFuture.completedFuture(functionStatusBuilder.build()); + getMetrics().handle((metrics, e) -> { + if (e == null) { + functionStatusBuilder.setMetrics(metrics); + } + statsFuture.complete(functionStatusBuilder.build()); + return null; + }); + return statsFuture; } @Override