This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 859c914 adding windowed metrics for functions (#3021) 859c914 is described below commit 859c914a7a3f3ddb9f89c62eba3c048039bd852a Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Wed Nov 21 11:08:09 2018 -0800 adding windowed metrics for functions (#3021) * adding windowed metrics for functions * adding license headers and cleaning up * remove unnecessary import * add RestException * fixing bugs and refactoring code * fix bug in instanceCache * fix bug * add test for stats and fix minor bug --- .../pulsar/broker/admin/impl/FunctionsBase.java | 4 +- .../apache/pulsar/io/PulsarFunctionE2ETest.java | 97 ++++++++- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 5 +- .../pulsar/common/policies/data/FunctionStats.java | 30 ++- .../functions/instance/FunctionStatsManager.java | 239 +++++++++++++++++++-- .../pulsar/functions/instance/InstanceCache.java | 51 +++++ .../functions/instance/JavaInstanceRunnable.java | 89 +++++--- .../src/main/python/InstanceCommunication_pb2.py | 80 +++++-- .../instance/src/main/python/function_stats.py | 132 +++++++++++- .../instance/src/main/python/python_instance.py | 83 ++++--- pulsar-functions/instance/src/main/python/util.py | 19 ++ .../src/main/proto/InstanceCommunication.proto | 12 ++ .../pulsar/functions/runtime/RuntimeSpawner.java | 2 +- .../pulsar/functions/runtime/ThreadRuntime.java | 46 ++-- .../functions/runtime/ThreadRuntimeFactory.java | 4 + .../org/apache/pulsar/functions/utils/Utils.java | 13 ++ .../functions/worker/FunctionRuntimeManager.java | 27 ++- .../pulsar/functions/worker/MembershipManager.java | 4 +- .../pulsar/functions/worker/SchedulerManager.java | 5 +- .../org/apache/pulsar/functions/worker/Utils.java | 20 +- .../functions/worker/rest/RestException.java | 78 +++++++ .../functions/worker/rest/api/FunctionsImpl.java | 46 ++-- .../worker/rest/api/v2/FunctionApiV2Resource.java | 12 +- .../worker/FunctionRuntimeManagerTest.java | 7 +- .../worker/FunctionStatsGeneratorTest.java | 1 + .../functions/worker/SchedulerManagerTest.java | 33 +-- .../pulsar/sql/presto/PulsarConnectorCache.java | 10 +- 27 files changed, 925 insertions(+), 224 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 6f7e43e..84cf8e0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -190,7 +190,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") }) @Path("/{tenant}/{namespace}/{functionName}/stats") - public Response getFunctionStats(final @PathParam("tenant") String tenant, + public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) throws IOException { return functions.getFunctionStats(tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri()); @@ -206,7 +206,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") }) @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats") - public Response getFunctionInstanceStats(final @PathParam("tenant") String tenant, + public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName, final @PathParam("instanceId") String instanceId) throws IOException { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index e683b6d..4dbdf3b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -32,6 +32,7 @@ import java.net.URL; import java.util.*; import java.util.concurrent.TimeUnit; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -45,6 +46,7 @@ import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.functions.Utils; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.FunctionStats; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; @@ -357,13 +359,102 @@ public class PulsarFunctionE2ETest { }, 5, 200); FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager(); - FunctionStatusList functionStats = functionRuntimeManager.getAllFunctionStatus(tenant, namespacePortion, + FunctionStats functionStats = functionRuntimeManager.getFunctionStats(tenant, namespacePortion, + functionName, null); + + FunctionStats functionStatsFromAdmin = admin.functions().getFunctionStats(tenant, namespacePortion, + functionName); + + assertEquals(functionStats, functionStatsFromAdmin); + + assertEquals(functionStats.getReceivedTotal(), totalMsgs); + assertEquals(functionStats.getProcessedSuccessfullyTotal(), totalMsgs); + assertEquals(functionStats.getSystemExceptionsTotal(), 0); + assertEquals(functionStats.getUserExceptionsTotal(), 0); + assertTrue(functionStats.avgProcessLatency > 0); + assertEquals(functionStats.oneMin.getReceivedTotal(), totalMsgs); + assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), totalMsgs); + assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0); + assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0); + assertTrue(functionStats.oneMin.getAvgProcessLatency() > 0); + assertEquals(functionStats.getAvgProcessLatency(), functionStats.oneMin.getAvgProcessLatency()); + assertTrue(functionStats.getLastInvocation() > 0); + + assertEquals(functionStats.instances.size(), 1); + assertEquals(functionStats.instances.get(0).getInstanceId(), 0); + assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), totalMsgs); + assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(), totalMsgs); + assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(), 0); + assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(), 0); + assertTrue(functionStats.instances.get(0).getMetrics().avgProcessLatency > 0); + assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getReceivedTotal(), totalMsgs); + assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getProcessedSuccessfullyTotal(), totalMsgs); + assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getSystemExceptionsTotal(), 0); + assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getUserExceptionsTotal(), 0); + assertTrue(functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency() > 0); + + assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(), functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency()); + assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(), functionStats.getAvgProcessLatency()); + } + + @Test(timeOut = 20000) + public void testPulsarFunctionStatus() throws Exception { + + final String namespacePortion = "io"; + final String replNamespace = tenant + "/" + namespacePortion; + final String sourceTopic = "persistent://" + replNamespace + "/my-topic1"; + final String sinkTopic = "persistent://" + replNamespace + "/output"; + final String propertyKey = "key"; + final String propertyValue = "value"; + final String functionName = "PulsarSink-test"; + final String subscriptionName = "test-sub"; + admin.namespaces().createNamespace(replNamespace); + Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + + // create a producer that creates a topic at broker + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create(); + + String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile(); + FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, + "my.*", sinkTopic, subscriptionName); + admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl); + + // try to update function to test: update-function functionality + admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl); + + retryStrategically((test) -> { + try { + return admin.topics().getStats(sourceTopic).subscriptions.size() == 1; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + // validate pulsar sink consumer has started on the topic + assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1); + + int totalMsgs = 10; + for (int i = 0; i < totalMsgs; i++) { + String data = "my-message-" + i; + producer.newMessage().property(propertyKey, propertyValue).value(data).send(); + } + retryStrategically((test) -> { + try { + SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); + return subStats.unackedMessages == 0 && subStats.msgThroughputOut == totalMsgs; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 200); + + FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager(); + FunctionStatusList functionStatus = functionRuntimeManager.getAllFunctionStatus(tenant, namespacePortion, functionName, null); - int numInstances = functionStats.getFunctionStatusListCount(); + int numInstances = functionStatus.getFunctionStatusListCount(); assertEquals(numInstances, 1); - FunctionStatus stats = functionStats.getFunctionStatusListList().get(0); + FunctionStatus stats = functionStatus.getFunctionStatusListList().get(0); double count = stats.getNumProcessed(); double success = stats.getNumSuccessfullyProcessed(); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 3a0d1e1..bfadbe4 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -659,11 +659,10 @@ public class CmdFunctions extends CmdBase { @Override void runCmd() throws Exception { - Gson gson = new GsonBuilder().setPrettyPrinting().create(); if (isBlank(instanceId)) { - System.out.println(gson.toJson(admin.functions().getFunctionStats(tenant, namespace, functionName))); + print(admin.functions().getFunctionStats(tenant, namespace, functionName)); } else { - System.out.println(gson.toJson(admin.functions().getFunctionStats(tenant, namespace, functionName, Integer.parseInt(instanceId)))); + print(admin.functions().getFunctionStats(tenant, namespace, functionName, Integer.parseInt(instanceId))); } } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java index 99e52e5..ba274c1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.common.policies.data; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; import lombok.Data; import java.util.HashMap; @@ -27,6 +29,7 @@ import java.util.Map; import java.util.function.Consumer; @Data +@JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency", "1min", "lastInvocation", "instances" }) public class FunctionStats { /** @@ -54,20 +57,24 @@ public class FunctionStats { **/ public double avgProcessLatency; + @JsonProperty("1min") + public FunctionInstanceStats.FunctionInstanceStatsDataBase oneMin = new FunctionInstanceStats.FunctionInstanceStatsDataBase(); + /** * Timestamp of when the function was last invoked by any instance **/ public long lastInvocation; @Data + @JsonPropertyOrder({ "instanceId", "metrics" }) public static class FunctionInstanceStats { /** Instance Id of function instance **/ public int instanceId; @Data - public static class FunctionInstanceStatsData { - + @JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency" }) + public static class FunctionInstanceStatsDataBase { /** * Total number of records function received from source for instance **/ @@ -92,6 +99,14 @@ public class FunctionStats { * Average process latency for function for instance **/ public double avgProcessLatency; + } + + @Data + @JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency", "1min", "lastInvocation", "userMetrics" }) + public static class FunctionInstanceStatsData extends FunctionInstanceStatsDataBase { + + @JsonProperty("1min") + public FunctionInstanceStatsDataBase oneMin = new FunctionInstanceStatsDataBase(); /** * Timestamp of when the function was last invoked for instance @@ -125,6 +140,13 @@ public class FunctionStats { systemExceptionsTotal += functionInstanceStatsData.systemExceptionsTotal; userExceptionsTotal += functionInstanceStatsData.userExceptionsTotal; avgProcessLatency += functionInstanceStatsData.avgProcessLatency; + + oneMin.receivedTotal += functionInstanceStatsData.oneMin.receivedTotal; + oneMin.processedSuccessfullyTotal += functionInstanceStatsData.oneMin.processedSuccessfullyTotal; + oneMin.systemExceptionsTotal += functionInstanceStatsData.oneMin.systemExceptionsTotal; + oneMin.userExceptionsTotal += functionInstanceStatsData.oneMin.userExceptionsTotal; + oneMin.avgProcessLatency += functionInstanceStatsData.oneMin.avgProcessLatency; + if (functionInstanceStatsData.lastInvocation > lastInvocation) { lastInvocation = functionInstanceStatsData.lastInvocation; } @@ -133,6 +155,10 @@ public class FunctionStats { }); // calculate average from sum avgProcessLatency = avgProcessLatency / instances.size(); + + // calculate 1min average from sum + oneMin.avgProcessLatency = oneMin.avgProcessLatency / instances.size(); + return this; } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java index 388efb1..4d66422 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java @@ -28,13 +28,17 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.proto.InstanceCommunication; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + /** * Function stats. */ @Slf4j @Getter @Setter -public class FunctionStatsManager { +public class FunctionStatsManager implements AutoCloseable { static final String[] metricsLabelNames = {"tenant", "namespace", "function", "instance_id", "cluster"}; @@ -50,6 +54,13 @@ public class FunctionStatsManager { public static final String LAST_INVOCATION = "last_invocation"; public static final String RECEIVED_TOTAL = "received_total"; + public static final String PROCESSED_TOTAL_1min = "processed_total_1min"; + public static final String PROCESSED_SUCCESSFULLY_TOTAL_1min = "processed_successfully_total_1min"; + public static final String SYSTEM_EXCEPTIONS_TOTAL_1min = "system_exceptions_total_1min"; + public static final String USER_EXCEPTIONS_TOTAL_1min = "user_exceptions_total_1min"; + public static final String PROCESS_LATENCY_MS_1min = "process_latency_ms_1min"; + public static final String RECEIVED_TOTAL_1min = "received_total_1min"; + /** Declare Prometheus stats **/ final Counter statTotalProcessed; @@ -65,18 +76,33 @@ public class FunctionStatsManager { final Gauge statlastInvocation; final Counter statTotalRecordsRecieved; + + // windowed metrics + + final Counter statTotalProcessed1min; + + final Counter statTotalProcessedSuccessfully1min; + + final Counter statTotalSysExceptions1min; + + final Counter statTotalUserExceptions1min; - CollectorRegistry functionCollectorRegistry; + final Summary statProcessLatency1min; + + final Counter statTotalRecordsRecieved1min; + + private String[] metricsLabels; + + private ScheduledFuture<?> scheduledFuture; @Getter private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestUserExceptions = EvictingQueue.create(10); @Getter private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSystemExceptions = EvictingQueue.create(10); - public FunctionStatsManager(CollectorRegistry collectorRegistry) { - // Declare function local collector registry so that it will not clash with other function instances' - // metrics collection especially in threaded mode - functionCollectorRegistry = new CollectorRegistry(); + public FunctionStatsManager(CollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService scheduledExecutorService) { + + this.metricsLabels = metricsLabels; statTotalProcessed = Counter.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_TOTAL) @@ -114,7 +140,7 @@ public class FunctionStatsManager { statlastInvocation = Gauge.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION) - .help("The timestamp of the last invocation of the function") + .help("The timestamp of the last invocation of the function.") .labelNames(metricsLabelNames) .register(collectorRegistry); @@ -123,6 +149,57 @@ public class FunctionStatsManager { .help("Total number of messages received from source.") .labelNames(metricsLabelNames) .register(collectorRegistry); + + statTotalProcessed1min = Counter.build() + .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_TOTAL_1min) + .help("Total number of messages processed in the last 1 minute.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + + statTotalProcessedSuccessfully1min = Counter.build() + .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL_1min) + .help("Total number of messages processed successfully in the last 1 minute.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + + statTotalSysExceptions1min = Counter.build() + .name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min) + .help("Total number of system exceptions in the last 1 minute.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + + statTotalUserExceptions1min = Counter.build() + .name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL_1min) + .help("Total number of user exceptions in the last 1 minute.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + + statProcessLatency1min = Summary.build() + .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min) + .help("Process latency in milliseconds in the last 1 minute.") + .quantile(0.5, 0.01) + .quantile(0.9, 0.01) + .quantile(0.99, 0.01) + .quantile(0.999, 0.01) + .labelNames(metricsLabelNames) + .register(collectorRegistry); + + statTotalRecordsRecieved1min = Counter.build() + .name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min) + .help("Total number of messages received from source in the last 1 minute.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + + scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + reset(); + } catch (Exception e) { + log.error("Failed to reset metrics for 1min window", e); + } + } + }, 1, 1, TimeUnit.MINUTES); } public void addUserException(Exception ex) { @@ -140,15 +217,149 @@ public class FunctionStatsManager { } + public void incrTotalReceived() { + statTotalRecordsRecieved.labels(metricsLabels).inc(); + statTotalRecordsRecieved1min.labels(metricsLabels).inc(); + } + + public void incrTotalProcessed() { + statTotalProcessed.labels(metricsLabels).inc(); + statTotalProcessed1min.labels(metricsLabels).inc(); + } + + public void incrTotalProcessedSuccessfully() { + statTotalProcessedSuccessfully.labels(metricsLabels).inc(); + statTotalProcessedSuccessfully1min.labels(metricsLabels).inc(); + } + + public void incrSysExceptions(Throwable sysException) { + statTotalSysExceptions.labels(metricsLabels).inc(); + statTotalSysExceptions1min.labels(metricsLabels).inc(); + addSystemException(sysException); + } + + public void incrUserExceptions(Exception userException) { + statTotalUserExceptions.labels(metricsLabels).inc(); + statTotalUserExceptions1min.labels(metricsLabels).inc(); + addUserException(userException); + } + + public void setLastInvocation(long ts) { + statlastInvocation.labels(metricsLabels).set(ts); + } + + private Long processTimeStart; + public void processTimeStart() { + processTimeStart = System.nanoTime(); + } + + public void processTimeEnd() { + if (processTimeStart != null) { + double endTimeMs = ((double) System.nanoTime() - processTimeStart) / 1.0E6D; + statProcessLatency.labels(metricsLabels).observe(endTimeMs); + statProcessLatency1min.labels(metricsLabels).observe(endTimeMs); + } + } + + public double getTotalProcessed() { + return statTotalProcessed.labels(metricsLabels).get(); + } + + public double getTotalProcessedSuccessfully() { + return statTotalProcessedSuccessfully.labels(metricsLabels).get(); + } + + public double getTotalRecordsReceived() { + return statTotalRecordsRecieved.labels(metricsLabels).get(); + } + + public double getTotalSysExceptions() { + return statTotalSysExceptions.labels(metricsLabels).get(); + } + + public double getTotalUserExceptions() { + return statTotalUserExceptions.labels(metricsLabels).get(); + } + + public double getLastInvocation() { + return statlastInvocation.labels(metricsLabels).get(); + } + + public double getAvgProcessLatency() { + return statProcessLatency.labels(metricsLabels).get().count <= 0.0 + ? 0 : statProcessLatency.labels(metricsLabels).get().sum / statProcessLatency.labels(metricsLabels).get().count; + } + + public double getProcessLatency50P() { + return statProcessLatency.labels(metricsLabels).get().quantiles.get(0.5); + } + + public double getProcessLatency90P() { + return statProcessLatency.labels(metricsLabels).get().quantiles.get(0.9); + } + + public double getProcessLatency99P() { + return statProcessLatency.labels(metricsLabels).get().quantiles.get(0.99); + } + + public double getProcessLatency99_9P() { + return statProcessLatency.labels(metricsLabels).get().quantiles.get(0.999); + } + + public double getTotalProcessed1min() { + return statTotalProcessed1min.labels(metricsLabels).get(); + } + + public double getTotalProcessedSuccessfully1min() { + return statTotalProcessedSuccessfully1min.labels(metricsLabels).get(); + } + + public double getTotalRecordsReceived1min() { + return statTotalRecordsRecieved1min.labels(metricsLabels).get(); + } + + public double getTotalSysExceptions1min() { + return statTotalSysExceptions1min.labels(metricsLabels).get(); + } + + public double getTotalUserExceptions1min() { + return statTotalUserExceptions1min.labels(metricsLabels).get(); + } + + public double getAvgProcessLatency1min() { + return statProcessLatency1min.labels(metricsLabels).get().count <= 0.0 + ? 0 : statProcessLatency1min.labels(metricsLabels).get().sum / statProcessLatency1min.labels(metricsLabels).get().count; + } + + public double getProcessLatency50P1min() { + return statProcessLatency1min.labels(metricsLabels).get().quantiles.get(0.5); + } + + public double getProcessLatency90P1min() { + return statProcessLatency1min.labels(metricsLabels).get().quantiles.get(0.9); + } + + public double getProcessLatency99P1min() { + return statProcessLatency1min.labels(metricsLabels).get().quantiles.get(0.99); + } + + public double getProcessLatency99_9P1min() { + return statProcessLatency1min.labels(metricsLabels).get().quantiles.get(0.999); + } + public void reset() { - statTotalProcessed.clear(); - statTotalProcessedSuccessfully.clear(); - statTotalSysExceptions.clear(); - statTotalUserExceptions.clear(); - statProcessLatency.clear(); - statlastInvocation.clear(); - statTotalRecordsRecieved.clear(); + statTotalProcessed1min.clear(); + statTotalProcessedSuccessfully1min.clear(); + statTotalSysExceptions1min.clear(); + statTotalUserExceptions1min.clear(); + statProcessLatency1min.clear(); + statTotalRecordsRecieved1min.clear(); latestUserExceptions.clear(); latestSystemExceptions.clear(); } + + @Override + public void close() { + scheduledFuture.cancel(false); + } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java new file mode 100644 index 0000000..7a6b2ca --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +public class InstanceCache { + + private static InstanceCache instance; + + public final ScheduledExecutorService executor; + + private InstanceCache() { + executor = Executors.newSingleThreadScheduledExecutor();; + } + + public static InstanceCache getInstanceCache() { + synchronized (InstanceCache.class) { + if (instance == null) { + instance = new InstanceCache(); + } + } + return instance; + } + + public static void shutdown() { + synchronized (InstanceCache.class) { + if (instance != null) { + instance.executor.shutdown(); + } + instance = null; + } + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 3a54455..70ed588 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -23,8 +23,6 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import io.netty.buffer.ByteBuf; import io.prometheus.client.CollectorRegistry; -import io.prometheus.client.Summary; -import javax.swing.tree.ExpandVetoException; import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -62,9 +60,11 @@ import org.apache.pulsar.functions.sink.PulsarSinkConfig; import org.apache.pulsar.functions.sink.PulsarSinkDisable; import org.apache.pulsar.functions.source.PulsarSource; import org.apache.pulsar.functions.source.PulsarSourceConfig; +import org.apache.pulsar.functions.utils.FunctionConfigUtils; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.StateUtils; +import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.Source; @@ -110,7 +110,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private Throwable deathException; // function stats - private final FunctionStatsManager stats; + private FunctionStatsManager stats; private Record<?> currentRecord; @@ -122,6 +122,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private CollectorRegistry collectorRegistry; private final String[] metricsLabels; + private InstanceCache instanceCache; + public JavaInstanceRunnable(InstanceConfig instanceConfig, FunctionCacheManager fnCache, String jarFile, @@ -134,7 +136,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { this.jarFile = jarFile; this.client = (PulsarClientImpl) pulsarClient; this.stateStorageServiceUrl = stateStorageServiceUrl; - this.stats = new FunctionStatsManager(collectorRegistry); this.secretsProvider = secretsProvider; this.collectorRegistry = collectorRegistry; this.metricsLabels = new String[]{ @@ -147,6 +148,11 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { String.valueOf(instanceConfig.getInstanceId()), instanceConfig.getClusterName() }; + + // Declare function local collector registry so that it will not clash with other function instances' + // metrics collection especially in threaded mode + // In process mode the JavaInstanceMain will declare a CollectorRegistry and pass it down + this.collectorRegistry = collectorRegistry; } /** @@ -199,10 +205,15 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { */ @Override public void run() { - String functionName = null; try { + this.instanceCache = InstanceCache.getInstanceCache(); + + if (this.collectorRegistry == null) { + this.collectorRegistry = new CollectorRegistry(); + } + this.stats = new FunctionStatsManager(this.collectorRegistry, this.metricsLabels, this.instanceCache.executor); + ContextImpl contextImpl = setupContext(); - functionName = String.format("%s-%s", contextImpl.getTenant(), contextImpl.getFunctionName()); javaInstance = setupJavaInstance(contextImpl); if (null != stateTable) { StateContextImpl stateContext = new StateContextImpl(stateTable); @@ -212,7 +223,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { currentRecord = readInput(); // increment number of records received from source - stats.statTotalRecordsRecieved.labels(metricsLabels).inc(); + stats.incrTotalReceived(); if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == org.apache.pulsar.functions .proto.Function.ProcessingGuarantees.ATMOST_ONCE) { @@ -225,18 +236,18 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { JavaExecutionResult result; // set last invocation time - stats.statlastInvocation.labels(metricsLabels).set(System.currentTimeMillis()); + stats.setLastInvocation(System.currentTimeMillis()); // start time for process latency stat - Summary.Timer requestTimer = stats.statProcessLatency.labels(metricsLabels).startTimer(); + stats.processTimeStart(); // process the message result = javaInstance.handleMessage(currentRecord, currentRecord.getValue()); // register end time - requestTimer.observeDuration(); + stats.processTimeEnd(); // increment total processed - stats.statTotalProcessed.labels(metricsLabels).inc(); + stats.incrTotalProcessed(); removeLogTopicHandler(); @@ -252,10 +263,13 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } } } catch (Throwable t) { - log.error("[{}] Uncaught exception in Java Instance", functionName, t); + log.error("[{}] Uncaught exception in Java Instance", Utils.getFullyQualifiedInstanceId( + instanceConfig.getFunctionDetails().getTenant(), + instanceConfig.getFunctionDetails().getNamespace(), + instanceConfig.getFunctionDetails().getName(), + instanceConfig.getInstanceId()), t); deathException = t; - stats.statTotalSysExceptions.labels(metricsLabels).inc(); - stats.addSystemException(t); + stats.incrSysExceptions(t); return; } finally { log.info("Closing instance"); @@ -357,8 +371,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { JavaExecutionResult result) throws Exception { if (result.getUserException() != null) { log.info("Encountered user exception when processing message {}", srcRecord, result.getUserException()); - stats.statTotalUserExceptions.labels(metricsLabels).inc(); - stats.addUserException(result.getUserException() ); + stats.incrUserExceptions(result.getUserException()); srcRecord.fail(); } else { if (result.getResult() != null) { @@ -370,7 +383,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } } // increment total successfully processed - stats.statTotalProcessedSuccessfully.labels(metricsLabels).inc(); + stats.incrTotalProcessedSuccessfully(); } } @@ -403,6 +416,11 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { @Override public void close() { + + if (stats != null) { + stats.close(); + } + if (source != null) { try { source.close(); @@ -465,35 +483,38 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private Builder createMetricsDataBuilder() { InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder(); - bldr.setProcessedTotal((long) stats.statTotalProcessed.labels(metricsLabels).get()); - bldr.setProcessedSuccessfullyTotal((long) stats.statTotalProcessedSuccessfully.labels(metricsLabels).get()); - bldr.setSystemExceptionsTotal((long) stats.statTotalSysExceptions.labels(metricsLabels).get()); - bldr.setUserExceptionsTotal((long) stats.statTotalUserExceptions.labels(metricsLabels).get()); - bldr.setReceivedTotal((long) stats.statTotalRecordsRecieved.labels(metricsLabels).get()); - bldr.setAvgProcessLatency(stats.statProcessLatency.labels(metricsLabels).get().count <= 0.0 - ? 0 : stats.statProcessLatency.labels(metricsLabels).get().sum / stats.statProcessLatency.labels(metricsLabels).get().count); - bldr.setLastInvocation((long) stats.statlastInvocation.labels(metricsLabels).get()); + bldr.setProcessedTotal((long) stats.getTotalProcessed()); + bldr.setProcessedSuccessfullyTotal((long) stats.getTotalProcessedSuccessfully()); + bldr.setSystemExceptionsTotal((long) stats.getTotalSysExceptions()); + bldr.setUserExceptionsTotal((long) stats.getTotalUserExceptions()); + bldr.setReceivedTotal((long) stats.getTotalRecordsReceived()); + bldr.setAvgProcessLatency(stats.getAvgProcessLatency()); + bldr.setLastInvocation((long) stats.getLastInvocation()); + + bldr.setProcessedTotal1Min((long) stats.getTotalProcessed1min()); + bldr.setProcessedSuccessfullyTotal1Min((long) stats.getTotalProcessedSuccessfully1min()); + bldr.setSystemExceptionsTotal1Min((long) stats.getTotalSysExceptions1min()); + bldr.setUserExceptionsTotal1Min((long) stats.getTotalUserExceptions1min()); + bldr.setReceivedTotal1Min((long) stats.getTotalRecordsReceived1min()); + bldr.setAvgProcessLatency1Min(stats.getAvgProcessLatency1min()); return bldr; } public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() { InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder(); - functionStatusBuilder.setNumProcessed((long) stats.statTotalProcessed.labels(metricsLabels).get()); - functionStatusBuilder.setNumSuccessfullyProcessed((long) stats.statTotalProcessedSuccessfully.labels(metricsLabels).get()); - functionStatusBuilder.setNumUserExceptions((long) stats.statTotalUserExceptions.labels(metricsLabels).get()); + functionStatusBuilder.setNumProcessed((long) stats.getTotalProcessed()); + functionStatusBuilder.setNumSuccessfullyProcessed((long) stats.getTotalProcessedSuccessfully()); + functionStatusBuilder.setNumUserExceptions((long) stats.getTotalUserExceptions()); stats.getLatestUserExceptions().forEach(ex -> { functionStatusBuilder.addLatestUserExceptions(ex); }); - functionStatusBuilder.setNumSystemExceptions((long) stats.statTotalSysExceptions.labels(metricsLabels).get()); + functionStatusBuilder.setNumSystemExceptions((long) stats.getTotalSysExceptions()); stats.getLatestSystemExceptions().forEach(ex -> { functionStatusBuilder.addLatestSystemExceptions(ex); }); - functionStatusBuilder.setAverageLatency( - stats.statProcessLatency.labels(metricsLabels).get().count == 0.0 - ? 0 : stats.statProcessLatency.labels(metricsLabels).get().sum / stats.statProcessLatency - .labels(metricsLabels).get().count); - functionStatusBuilder.setLastInvocationTime((long) stats.statlastInvocation.labels(metricsLabels).get()); + functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency()); + functionStatusBuilder.setLastInvocationTime((long) stats.getLastInvocation()); return functionStatusBuilder; } diff --git a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py index 6c843dc..f88e6a3 100644 --- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py +++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py @@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( name='InstanceCommunication.proto', package='proto', syntax='proto3', - serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xb3\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.Ex [...] + serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xb3\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.Ex [...] , dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,]) @@ -320,8 +320,8 @@ _METRICSDATA_USERMETRICSENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1117, - serialized_end=1167, + serialized_start=1317, + serialized_end=1367, ) _METRICSDATA = _descriptor.Descriptor( @@ -339,49 +339,91 @@ _METRICSDATA = _descriptor.Descriptor( is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='processedTotal', full_name='proto.MetricsData.processedTotal', index=1, + name='receivedTotal_1min', full_name='proto.MetricsData.receivedTotal_1min', index=1, + number=10, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='processedTotal', full_name='proto.MetricsData.processedTotal', index=2, number=3, type=3, cpp_type=2, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='processedSuccessfullyTotal', full_name='proto.MetricsData.processedSuccessfullyTotal', index=2, + name='processedTotal_1min', full_name='proto.MetricsData.processedTotal_1min', index=3, + number=11, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='processedSuccessfullyTotal', full_name='proto.MetricsData.processedSuccessfullyTotal', index=4, number=4, type=3, cpp_type=2, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='systemExceptionsTotal', full_name='proto.MetricsData.systemExceptionsTotal', index=3, + name='processedSuccessfullyTotal_1min', full_name='proto.MetricsData.processedSuccessfullyTotal_1min', index=5, + number=12, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='systemExceptionsTotal', full_name='proto.MetricsData.systemExceptionsTotal', index=6, number=5, type=3, cpp_type=2, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='userExceptionsTotal', full_name='proto.MetricsData.userExceptionsTotal', index=4, + name='systemExceptionsTotal_1min', full_name='proto.MetricsData.systemExceptionsTotal_1min', index=7, + number=13, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='userExceptionsTotal', full_name='proto.MetricsData.userExceptionsTotal', index=8, number=6, type=3, cpp_type=2, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='avgProcessLatency', full_name='proto.MetricsData.avgProcessLatency', index=5, + name='userExceptionsTotal_1min', full_name='proto.MetricsData.userExceptionsTotal_1min', index=9, + number=14, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='avgProcessLatency', full_name='proto.MetricsData.avgProcessLatency', index=10, number=7, type=1, cpp_type=5, label=1, has_default_value=False, default_value=float(0), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='lastInvocation', full_name='proto.MetricsData.lastInvocation', index=6, + name='avgProcessLatency_1min', full_name='proto.MetricsData.avgProcessLatency_1min', index=11, + number=15, type=1, cpp_type=5, label=1, + has_default_value=False, default_value=float(0), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='lastInvocation', full_name='proto.MetricsData.lastInvocation', index=12, number=8, type=3, cpp_type=2, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='userMetrics', full_name='proto.MetricsData.userMetrics', index=7, + name='userMetrics', full_name='proto.MetricsData.userMetrics', index=13, number=9, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, @@ -400,7 +442,7 @@ _METRICSDATA = _descriptor.Descriptor( oneofs=[ ], serialized_start=850, - serialized_end=1167, + serialized_end=1367, ) @@ -430,8 +472,8 @@ _HEALTHCHECKRESULT = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1169, - serialized_end=1205, + serialized_start=1369, + serialized_end=1405, ) @@ -475,8 +517,8 @@ _METRICS_INSTANCEMETRICS = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1268, - serialized_end=1360, + serialized_start=1468, + serialized_end=1560, ) _METRICS = _descriptor.Descriptor( @@ -505,8 +547,8 @@ _METRICS = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1208, - serialized_end=1360, + serialized_start=1408, + serialized_end=1560, ) _FUNCTIONSTATUS_EXCEPTIONINFORMATION.containing_type = _FUNCTIONSTATUS @@ -608,8 +650,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor( file=DESCRIPTOR, index=0, options=None, - serialized_start=1363, - serialized_end=1711, + serialized_start=1563, + serialized_end=1911, methods=[ _descriptor.MethodDescriptor( name='GetFunctionStatus', diff --git a/pulsar-functions/instance/src/main/python/function_stats.py b/pulsar-functions/instance/src/main/python/function_stats.py index ff57509..f720a41 100644 --- a/pulsar-functions/instance/src/main/python/function_stats.py +++ b/pulsar-functions/instance/src/main/python/function_stats.py @@ -19,6 +19,7 @@ import traceback import time +import util from prometheus_client import Counter, Summary, Gauge @@ -37,6 +38,13 @@ class Stats(object): LAST_INVOCATION = 'last_invocation' TOTAL_RECEIVED = 'received_total' + TOTAL_PROCESSED_1min = 'processed_total_1min' + TOTAL_SUCCESSFULLY_PROCESSED_1min = 'processed_successfully_total_1min' + TOTAL_SYSTEM_EXCEPTIONS_1min = 'system_exceptions_total_1min' + TOTAL_USER_EXCEPTIONS_1min = 'user_exceptions_total_1min' + PROCESS_LATENCY_MS_1min = 'process_latency_ms_1min' + TOTAL_RECEIVED_1min = 'received_total_1min' + # Declare Prometheus stat_total_processed = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_PROCESSED, 'Total number of messages processed.', metrics_label_names) stat_total_processed_successfully = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_SUCCESSFULLY_PROCESSED, @@ -52,9 +60,116 @@ class Stats(object): stat_total_received = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_RECEIVED, 'Total number of messages received from source.', metrics_label_names) + + # 1min windowed metrics + stat_total_processed_1min = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_PROCESSED_1min, + 'Total number of messages processed in the last 1 minute.', metrics_label_names) + stat_total_processed_successfully_1min = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_SUCCESSFULLY_PROCESSED_1min, + 'Total number of messages processed successfully in the last 1 minute.', metrics_label_names) + stat_total_sys_exceptions_1min = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_SYSTEM_EXCEPTIONS_1min, + 'Total number of system exceptions in the last 1 minute.', + metrics_label_names) + stat_total_user_exceptions_1min = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_USER_EXCEPTIONS_1min, + 'Total number of user exceptions in the last 1 minute.', + metrics_label_names) + + stat_process_latency_ms_1min = Summary(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min, + 'Process latency in milliseconds in the last 1 minute.', metrics_label_names) + + stat_total_received_1min = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_RECEIVED_1min, + 'Total number of messages received from source in the last 1 minute.', metrics_label_names) + latest_user_exception = [] latest_sys_exception = [] + def __init__(self, metrics_labels): + self.metrics_labels = metrics_labels; + self.process_start_time = None + + # start time for windowed metrics + util.FixedTimer(60, self.reset).start() + + def get_total_received(self): + return self.stat_total_received.labels(*self.metrics_labels)._value.get(); + + def get_total_processed(self): + return self.stat_total_processed.labels(*self.metrics_labels)._value.get(); + + def get_total_processed_successfully(self): + return self.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get(); + + def get_total_sys_exceptions(self): + return self.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get(); + + def get_total_user_exceptions(self): + return self.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get(); + + def get_avg_process_latency(self): + process_latency_ms_count = self.stat_process_latency_ms.labels(*self.metrics_labels)._count.get() + process_latency_ms_sum = self.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get() + return 0.0 \ + if process_latency_ms_count <= 0.0 \ + else process_latency_ms_sum / process_latency_ms_count + + def get_total_received_1min(self): + return self.stat_total_received_1min.labels(*self.metrics_labels)._value.get(); + + def get_total_processed_1min(self): + return self.stat_total_processed_1min.labels(*self.metrics_labels)._value.get(); + + def get_total_processed_successfully_1min(self): + return self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)._value.get(); + + def get_total_sys_exceptions_1min(self): + return self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)._value.get(); + + def get_total_user_exceptions_1min(self): + return self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)._value.get(); + + def get_avg_process_latency_1min(self): + process_latency_ms_count = self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._count.get() + process_latency_ms_sum = self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._sum.get() + return 0.0 \ + if process_latency_ms_count <= 0.0 \ + else process_latency_ms_sum / process_latency_ms_count + + def get_last_invocation(self): + return self.stat_last_invocation.labels(*self.metrics_labels)._value.get() + + def incr_total_processed(self): + self.stat_total_processed.labels(*self.metrics_labels).inc() + self.stat_total_processed_1min.labels(*self.metrics_labels).inc() + + def incr_total_processed_successfully(self): + self.stat_total_processed_successfully.labels(*self.metrics_labels).inc() + self.stat_total_processed_successfully_1min.labels(*self.metrics_labels).inc() + + def incr_total_sys_exceptions(self): + self.stat_total_sys_exceptions.labels(*self.metrics_labels).inc() + self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels).inc() + self.add_sys_exception() + + def incr_total_user_exceptions(self): + self.stat_total_user_exceptions.labels(*self.metrics_labels).inc() + self.stat_total_user_exceptions_1min.labels(*self.metrics_labels).inc() + self.add_user_exception() + + def incr_total_received(self): + self.stat_total_received.labels(*self.metrics_labels).inc() + self.stat_total_received_1min.labels(*self.metrics_labels).inc() + + def process_time_start(self): + self.process_start_time = time.time(); + + def process_time_end(self): + if self.process_start_time: + duration = (time.time() - self.process_start_time) * 1000.0 + self.stat_process_latency_ms.labels(*self.metrics_labels).observe(duration) + self.stat_process_latency_ms_1min.labels(*self.metrics_labels).observe(duration) + + def set_last_invocation(self, time): + self.stat_last_invocation.labels(*self.metrics_labels).set(time * 1000.0) + def add_user_exception(self): self.latest_sys_exception.append((traceback.format_exc(), int(time.time() * 1000))) if len(self.latest_sys_exception) > 10: @@ -65,14 +180,13 @@ class Stats(object): if len(self.latest_sys_exception) > 10: self.latest_sys_exception.pop(0) - def reset(self, metrics_labels): + def reset(self): self.latest_user_exception = [] self.latest_sys_exception = [] - self.stat_total_processed.labels(*metrics_labels)._value.set(0.0) - self.stat_total_processed_successfully.labels(*metrics_labels)._value.set(0.0) - self.stat_total_user_exceptions.labels(*metrics_labels)._value.set(0.0) - self.stat_total_sys_exceptions.labels(*metrics_labels)._value.set(0.0) - self.stat_process_latency_ms.labels(*metrics_labels)._sum.set(0.0) - self.stat_process_latency_ms.labels(*metrics_labels)._count.set(0.0) - self.stat_last_invocation.labels(*metrics_labels).set(0.0) - self.stat_total_received.labels(*metrics_labels)._value.set(0.0) \ No newline at end of file + self.stat_total_processed_1min.labels(*self.metrics_labels)._value.set(0.0) + self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)._value.set(0.0) + self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)._value.set(0.0) + self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)._value.set(0.0) + self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._sum.set(0.0) + self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._count.set(0.0) + self.stat_total_received_1min.labels(*self.metrics_labels)._value.set(0.0) \ No newline at end of file diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index f05e7b2..ec85af1 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -88,7 +88,6 @@ class PythonInstance(object): self.atleast_once = self.instance_config.function_details.processingGuarantees == Function_pb2.ProcessingGuarantees.Value('ATLEAST_ONCE') self.auto_ack = self.instance_config.function_details.autoAck self.contextimpl = None - self.stats = Stats() self.last_health_check_ts = time.time() self.timeout_ms = function_details.source.timeoutMs if function_details.source.timeoutMs > 0 else None self.expected_healthcheck_interval = expected_healthcheck_interval @@ -97,6 +96,7 @@ class PythonInstance(object): "%s/%s" % (function_details.tenant, function_details.namespace), "%s/%s/%s" % (function_details.tenant, function_details.namespace, function_details.name), instance_id, cluster_name] + self.stats = Stats(self.metrics_labels) def health_check(self): self.last_health_check_ts = time.time() @@ -199,31 +199,35 @@ class PythonInstance(object): successfully_executed = False try: # get user function start time for statistic calculation - start_time = time.time() - Stats.stat_last_invocation.labels(*self.metrics_labels).set(start_time * 1000.0) + self.stats.set_last_invocation(time.time()) + + # start timer for process time + self.stats.process_time_start() if self.function_class is not None: output_object = self.function_class.process(input_object, self.contextimpl) else: output_object = self.function_purefunction.process(input_object) successfully_executed = True - Stats.stat_process_latency_ms.labels(*self.metrics_labels).observe((time.time() - start_time) * 1000.0) - Stats.stat_total_processed.labels(*self.metrics_labels).inc() + + # stop timer for process time + self.stats.process_time_end() + + # incr total processed stat + self.stats.incr_total_processed() except Exception as e: Log.exception("Exception while executing user method") - Stats.stat_total_user_exceptions.labels(*self.metrics_labels).inc() - self.stats.add_user_exception() + self.stats.incr_total_user_exceptions(); if self.log_topic_handler is not None: log.remove_all_handlers() log.add_handler(self.saved_log_handler) if successfully_executed: self.process_result(output_object, msg) - Stats.stat_total_processed_successfully.labels(*self.metrics_labels).inc() + self.stats.incr_total_processed_successfully() except Exception as e: Log.error("Uncaught exception in Python instance: %s" % e); - Stats.stat_total_sys_exceptions.labels(*self.metrics_labels).inc() - self.stats.add_sys_exception() + self.stats.incr_total_sys_exceptions() def done_producing(self, consumer, orig_message, result, sent_message): if result == pulsar.Result.Ok and self.auto_ack and self.atleast_once: @@ -269,7 +273,7 @@ class PythonInstance(object): def message_listener(self, serde, consumer, message): # increment number of received records from source - Stats.stat_total_received.labels(*self.metrics_labels).inc() + self.stats.incr_total_received() item = InternalMessage(message, consumer.topic(), serde, consumer) self.queue.put(item, True) if self.atmost_once and self.auto_ack: @@ -282,31 +286,45 @@ class PythonInstance(object): return metrics def reset_metrics(self): - self.stats.reset(self.metrics_labels) + self.stats.reset() self.contextimpl.reset_metrics() def get_metrics(self): - total_received = Stats.stat_total_received.labels(*self.metrics_labels)._value.get() - total_processed = Stats.stat_total_processed.labels(*self.metrics_labels)._value.get() - total_processed_successfully = Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get() - total_user_exceptions = Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get() - total_sys_exceptions = Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get() - process_latency_ms_count = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get() - process_latency_ms_sum = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get() - last_invocation = Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get() + total_received = self.stats.get_total_received() + total_processed = self.stats.get_total_processed() + total_processed_successfully = self.stats.get_total_processed_successfully() + total_user_exceptions = self.stats.get_total_user_exceptions() + total_sys_exceptions = self.stats.get_total_sys_exceptions() + avg_process_latency_ms = self.stats.get_avg_process_latency() + last_invocation = self.stats.get_last_invocation() + + total_received_1min = self.stats.get_total_received_1min() + total_processed_1min = self.stats.get_total_processed_1min() + total_processed_successfully_1min = self.stats.get_total_processed_successfully_1min() + total_user_exceptions_1min = self.stats.get_total_user_exceptions_1min() + total_sys_exceptions_1min = self.stats.get_total_sys_exceptions_1min() + avg_process_latency_ms_1min = self.stats.get_avg_process_latency_1min() metrics_data = InstanceCommunication_pb2.MetricsData() - + # total metrics metrics_data.receivedTotal = int(total_received) if sys.version_info.major >= 3 else long(total_received) metrics_data.processedTotal = int(total_processed) if sys.version_info.major >= 3 else long(total_processed) metrics_data.processedSuccessfullyTotal = int(total_processed_successfully) if sys.version_info.major >= 3 else long(total_processed_successfully) metrics_data.systemExceptionsTotal = int(total_sys_exceptions) if sys.version_info.major >= 3 else long(total_sys_exceptions) metrics_data.userExceptionsTotal = int(total_user_exceptions) if sys.version_info.major >= 3 else long(total_user_exceptions) - metrics_data.avgProcessLatency = 0.0 \ - if process_latency_ms_count <= 0.0 \ - else process_latency_ms_sum / process_latency_ms_count + metrics_data.avgProcessLatency = avg_process_latency_ms metrics_data.lastInvocation = int(last_invocation) if sys.version_info.major >= 3 else long(last_invocation) + # 1min metrics + metrics_data.receivedTotal_1min = int(total_received_1min) if sys.version_info.major >= 3 else long(total_received_1min) + metrics_data.processedTotal_1min = int(total_processed_1min) if sys.version_info.major >= 3 else long(total_processed_1min) + metrics_data.processedSuccessfullyTotal_1min = int( + total_processed_successfully_1min) if sys.version_info.major >= 3 else long(total_processed_successfully_1min) + metrics_data.systemExceptionsTotal_1min = int(total_sys_exceptions_1min) if sys.version_info.major >= 3 else long( + total_sys_exceptions_1min) + metrics_data.userExceptionsTotal_1min = int(total_user_exceptions_1min) if sys.version_info.major >= 3 else long( + total_user_exceptions_1min) + metrics_data.avgProcessLatency_1min = avg_process_latency_ms_1min # get any user metrics user_metrics = self.contextimpl.get_metrics() @@ -325,13 +343,12 @@ class PythonInstance(object): status = InstanceCommunication_pb2.FunctionStatus() status.running = True - total_processed = Stats.stat_total_processed.labels(*self.metrics_labels)._value.get() - total_processed_successfully = Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get() - total_user_exceptions = Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get() - total_sys_exceptions = Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get() - process_latency_ms_count = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get() - process_latency_ms_sum = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get() - last_invocation = Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get() + total_processed = self.stats.get_total_processed() + total_processed_successfully = self.stats.get_total_processed_successfully() + total_user_exceptions = self.stats.get_total_user_exceptions() + total_sys_exceptions = self.stats.get_total_sys_exceptions() + avg_process_latency_ms = self.stats.get_avg_process_latency() + last_invocation = self.stats.get_last_invocation() status.numProcessed = int(total_processed) if sys.version_info.major >= 3 else long(total_processed) status.numSuccessfullyProcessed = int(total_processed_successfully) if sys.version_info.major >= 3 else long(total_processed_successfully) @@ -346,9 +363,7 @@ class PythonInstance(object): to_add = status.latestSystemExceptions.add() to_add.exceptionString = ex to_add.msSinceEpoch = tm - status.averageLatency = 0.0 \ - if process_latency_ms_count <= 0.0 \ - else process_latency_ms_sum / process_latency_ms_count + status.averageLatency = avg_process_latency_ms status.lastInvocationTime = int(last_invocation) if sys.version_info.major >= 3 else long(last_invocation) return status diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py index 56c1ce1..76f75bd 100644 --- a/pulsar-functions/instance/src/main/python/util.py +++ b/pulsar-functions/instance/src/main/python/util.py @@ -26,6 +26,7 @@ import os import inspect import sys import importlib +from threading import Timer import log @@ -66,3 +67,21 @@ def import_class_from_path(from_path, full_class_name): def getFullyQualifiedFunctionName(tenant, namespace, name): return "%s/%s/%s" % (tenant, namespace, name) + +class FixedTimer(): + + def __init__(self, t, hFunction): + self.t = t + self.hFunction = hFunction + self.thread = Timer(self.t, self.handle_function) + + def handle_function(self): + self.hFunction() + self.thread = Timer(self.t, self.handle_function) + self.thread.start() + + def start(self): + self.thread.start() + + def cancel(self): + self.thread.cancel() \ No newline at end of file diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto index d5f5b79..05d1399 100644 --- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto +++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto @@ -71,21 +71,33 @@ message MetricsData { // Total number of records function received from source int64 receivedTotal = 2; + int64 receivedTotal_1min = 10; + // Total number of records processed int64 processedTotal = 3; + int64 processedTotal_1min = 11; + // Total number of records successfully processed by user function int64 processedSuccessfullyTotal = 4; + int64 processedSuccessfullyTotal_1min = 12; + // Total number of system exceptions thrown int64 systemExceptionsTotal = 5; + int64 systemExceptionsTotal_1min = 13; + // Total number of user exceptions thrown int64 userExceptionsTotal = 6; + int64 userExceptionsTotal_1min = 14; + // Average process latency for function double avgProcessLatency = 7; + double avgProcessLatency_1min = 15; + // Timestamp of when the function was last invoked int64 lastInvocation = 8; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java index 5688411..6b5abce 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java @@ -94,7 +94,7 @@ public class RuntimeSpawner implements AutoCloseable { runtime.start(); } catch (Exception e) { log.error("{}/{}/{}-{} Function Restart failed", details.getTenant(), - details.getNamespace(), details.getName(), e); + details.getNamespace(), details.getName(), e, e); } numRestarts++; } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java index 913d4ae..460cdb0 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java @@ -47,7 +47,12 @@ class ThreadRuntime implements Runtime { private InstanceConfig instanceConfig; private JavaInstanceRunnable javaInstanceRunnable; private ThreadGroup threadGroup; - + private FunctionCacheManager fnCache; + private String jarFile; + private PulsarClient pulsarClient; + private String stateStorageServiceUrl; + private SecretsProvider secretsProvider; + private CollectorRegistry collectorRegistry; ThreadRuntime(InstanceConfig instanceConfig, FunctionCacheManager fnCache, ThreadGroup threadGroup, @@ -61,22 +66,21 @@ class ThreadRuntime implements Runtime { throw new RuntimeException("Thread Container only supports Java Runtime"); } - // if collector registry is not set, create one for this thread. - // since each thread / instance will needs its own collector registry for metrics collection - CollectorRegistry instanceCollectorRegistry = collectorRegistry; - if (instanceCollectorRegistry == null) { - instanceCollectorRegistry = new CollectorRegistry(); - } - - this.javaInstanceRunnable = new JavaInstanceRunnable( - instanceConfig, - fnCache, - jarFile, - pulsarClient, - stateStorageServiceUrl, - secretsProvider, - instanceCollectorRegistry); this.threadGroup = threadGroup; + this.fnCache = fnCache; + this.jarFile = jarFile; + this.pulsarClient = pulsarClient; + this.stateStorageServiceUrl = stateStorageServiceUrl; + this.secretsProvider = secretsProvider; + this.collectorRegistry = collectorRegistry; + this.javaInstanceRunnable = new JavaInstanceRunnable( + instanceConfig, + fnCache, + jarFile, + pulsarClient, + stateStorageServiceUrl, + secretsProvider, + collectorRegistry); } /** @@ -84,6 +88,16 @@ class ThreadRuntime implements Runtime { */ @Override public void start() { + // re-initialize JavaInstanceRunnable so that variables in constructor can be re-initialized + this.javaInstanceRunnable = new JavaInstanceRunnable( + instanceConfig, + fnCache, + jarFile, + pulsarClient, + stateStorageServiceUrl, + secretsProvider, + collectorRegistry); + log.info("ThreadContainer starting function with instance config {}", instanceConfig); this.fnThread = new Thread(threadGroup, javaInstanceRunnable, String.format("%s-%s", diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java index ed76117..a3fd850 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.functions.instance.AuthenticationConfig; +import org.apache.pulsar.functions.instance.InstanceCache; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.secretsprovider.SecretsProvider; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; @@ -115,5 +116,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory { } catch (PulsarClientException e) { log.warn("Failed to close pulsar client when closing function container factory", e); } + + // Shutdown instance cache + InstanceCache.shutdown(); } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java index 8181e9f..7414446 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java @@ -336,4 +336,17 @@ public class Utils { } return null; } + + public static String getFullyQualifiedInstanceId(org.apache.pulsar.functions.proto.Function.Instance instance) { + return getFullyQualifiedInstanceId( + instance.getFunctionMetaData().getFunctionDetails().getTenant(), + instance.getFunctionMetaData().getFunctionDetails().getNamespace(), + instance.getFunctionMetaData().getFunctionDetails().getName(), + instance.getInstanceId()); + } + + public static String getFullyQualifiedInstanceId(String tenant, String namespace, + String functionName, int instanceId) { + return String.format("%s/%s/%s:%d", tenant, namespace, functionName, instanceId); + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index add4013..98ec5a1 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -58,7 +58,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; -import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.Reflections; /** @@ -318,7 +317,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ final String workerId = this.workerConfig.getWorkerId(); if (assignedWorkerId.equals(workerId)) { - stopFunction(Utils.getFullyQualifiedInstanceId(assignment.getInstance()), restart); + stopFunction(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()), restart); return Response.status(Status.OK).build(); } else { // query other worker @@ -356,7 +355,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ Assignment assignment = assignments.iterator().next(); final String assignedWorkerId = assignment.getWorkerId(); final String workerId = this.workerConfig.getWorkerId(); - String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); + String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()); if (assignedWorkerId.equals(workerId)) { stopFunction(fullyQualifiedInstanceId, restart); } else { @@ -384,7 +383,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ for (Assignment assignment : assignments) { final String assignedWorkerId = assignment.getWorkerId(); final String workerId = this.workerConfig.getWorkerId(); - String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); + String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()); if (assignedWorkerId.equals(workerId)) { stopFunction(fullyQualifiedInstanceId, restart); } else { @@ -427,7 +426,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ Map<String, Assignment> assignments = workerIdToAssignments.get(workerId); if (assignments != null) { assignments.values().forEach(assignment -> { - String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); + String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()); try { stopFunction(fullyQualifiedInstanceId, false); } catch (Exception e) { @@ -481,10 +480,10 @@ public class FunctionRuntimeManager implements AutoCloseable{ // If I am running worker if (assignedWorkerId.equals(workerId)) { FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo( - Utils.getFullyQualifiedInstanceId(assignment.getInstance())); + org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance())); RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner(); if (runtimeSpawner != null) { - return Utils.getFunctionInstanceStats(Utils.getFullyQualifiedInstanceId(assignment.getInstance()), functionRuntimeInfo).getMetrics(); + return Utils.getFunctionInstanceStats(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()), functionRuntimeInfo).getMetrics(); } return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData(); } else { @@ -619,7 +618,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ // If I am running worker if (assignedWorkerId.equals(workerId)) { FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo( - Utils.getFullyQualifiedInstanceId(assignment.getInstance())); + org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance())); RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner(); if (runtimeSpawner != null) { try { @@ -753,7 +752,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ existingAssignmentMap.putAll(entry); } - if (existingAssignmentMap.containsKey(Utils.getFullyQualifiedInstanceId(newAssignment.getInstance()))) { + if (existingAssignmentMap.containsKey(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(newAssignment.getInstance()))) { updateAssignment(newAssignment); } else { addAssignment(newAssignment); @@ -761,7 +760,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ } private void updateAssignment(Assignment assignment) { - String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); + String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()); Assignment existingAssignment = this.findAssignment(assignment); // potential updates need to happen if (!existingAssignment.equals(assignment)) { @@ -812,7 +811,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ @VisibleForTesting void deleteAssignment(Assignment assignment) { - String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); + String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()); Map<String, Assignment> assignmentMap = this.workerIdToAssignments.get(assignment.getWorkerId()); if (assignmentMap != null) { if (assignmentMap.containsKey(fullyQualifiedInstanceId)) { @@ -835,7 +834,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ } private void startFunctionInstance(Assignment assignment) { - String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); + String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()); if (!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) { this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, new FunctionRuntimeInfo() .setFunctionInstance(assignment.getInstance())); @@ -888,7 +887,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ private Assignment findAssignment(String tenant, String namespace, String functionName, int instanceId) { String fullyQualifiedInstanceId - = Utils.getFullyQualifiedInstanceId(tenant, namespace, functionName, instanceId); + = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(tenant, namespace, functionName, instanceId); for (Map.Entry<String, Map<String, Assignment>> entry : this.workerIdToAssignments.entrySet()) { Map<String, Assignment> assignmentMap = entry.getValue(); Assignment existingAssignment = assignmentMap.get(fullyQualifiedInstanceId); @@ -914,7 +913,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ this.workerIdToAssignments.put(assignment.getWorkerId(), new HashMap<>()); } this.workerIdToAssignments.get(assignment.getWorkerId()).put( - Utils.getFullyQualifiedInstanceId(assignment.getInstance()), + org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()), assignment); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java index d6e7b47..efe702f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java @@ -194,7 +194,7 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener { Map.Entry<Function.Instance, Long> entry = it.next(); String fullyQualifiedFunctionName = FunctionDetailsUtils.getFullyQualifiedName( entry.getKey().getFunctionMetaData().getFunctionDetails()); - String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(entry.getKey()); + String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(entry.getKey()); //remove functions that don't exist anymore if (!functionMetaDataMap.containsKey(fullyQualifiedFunctionName)) { it.remove(); @@ -262,7 +262,7 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener { if (currentTimeMs - unassignedDurationMs > this.workerConfig.getRescheduleTimeoutMs()) { needSchedule.add(instance); // remove assignment from failed node - Function.Assignment assignment = assignmentMap.get(Utils.getFullyQualifiedInstanceId(instance)); + Function.Assignment assignment = assignmentMap.get(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(instance)); if (assignment != null) { needRemove.add(assignment); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java index c496766..2a93494 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java @@ -31,10 +31,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Predicate; import java.util.stream.Collectors; -import lombok.Getter; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -49,6 +47,7 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; import org.apache.pulsar.functions.proto.Function.Instance; import org.apache.pulsar.functions.utils.Reflections; +import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.worker.scheduler.IScheduler; import com.google.common.annotations.VisibleForTesting; @@ -250,7 +249,7 @@ public class SchedulerManager implements AutoCloseable { private void publishNewAssignment(Assignment assignment, boolean deleted) { try { - String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); + String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()); // publish empty message with instance-id key so, compactor can delete and skip delivery of this instance-id // message producer.newMessage().key(fullyQualifiedInstanceId) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java index 7defdf6..d6863a7 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java @@ -29,6 +29,7 @@ import java.util.concurrent.ExecutionException; import java.util.function.Predicate; import java.util.stream.Collectors; +import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -198,19 +199,6 @@ public final class Utils { } } - public static String getFullyQualifiedInstanceId(Function.Instance instance) { - return getFullyQualifiedInstanceId( - instance.getFunctionMetaData().getFunctionDetails().getTenant(), - instance.getFunctionMetaData().getFunctionDetails().getNamespace(), - instance.getFunctionMetaData().getFunctionDetails().getName(), - instance.getInstanceId()); - } - - public static String getFullyQualifiedInstanceId(String tenant, String namespace, - String functionName, int instanceId) { - return String.format("%s/%s/%s:%d", tenant, namespace, functionName, instanceId); - } - public static FunctionStats.FunctionInstanceStats getFunctionInstanceStats(String fullyQualifiedInstanceName, FunctionRuntimeInfo functionRuntimeInfo) { RuntimeSpawner functionRuntimeSpawner = functionRuntimeInfo.getRuntimeSpawner(); @@ -234,6 +222,12 @@ public final class Utils { functionInstanceStatsData.setAvgProcessLatency(metricsData.getAvgProcessLatency()); functionInstanceStatsData.setLastInvocation(metricsData.getLastInvocation()); + functionInstanceStatsData.oneMin.setReceivedTotal(metricsData.getReceivedTotal1Min()); + functionInstanceStatsData.oneMin.setProcessedSuccessfullyTotal(metricsData.getProcessedSuccessfullyTotal1Min()); + functionInstanceStatsData.oneMin.setSystemExceptionsTotal(metricsData.getSystemExceptionsTotal1Min()); + functionInstanceStatsData.oneMin.setUserExceptionsTotal(metricsData.getUserExceptionsTotal1Min()); + functionInstanceStatsData.oneMin.setAvgProcessLatency(metricsData.getAvgProcessLatency1Min()); + // Filter out values that are NaN Map<String, Double> statsDataMap = metricsData.getUserMetricsMap().entrySet().stream() .filter(stringDoubleEntry -> !stringDoubleEntry.getValue().isNaN()) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/RestException.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/RestException.java new file mode 100644 index 0000000..537ec1d --- /dev/null +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/RestException.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker.rest; + +import java.io.PrintWriter; +import java.io.StringWriter; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import javax.ws.rs.core.Response.Status; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.policies.data.ErrorData; + +/** + * Exception used to provide better error messages to clients of the REST API. + */ +@SuppressWarnings("serial") +public class RestException extends WebApplicationException { + static String getExceptionData(Throwable t) { + StringWriter writer = new StringWriter(); + writer.append("\n --- An unexpected error occurred in the server ---\n\n"); + if (t != null) { + writer.append("Message: ").append(t.getMessage()).append("\n\n"); + } + writer.append("Stacktrace:\n\n"); + + t.printStackTrace(new PrintWriter(writer)); + return writer.toString(); + } + + public RestException(Response.Status status, String message) { + this(status.getStatusCode(), message); + } + + public RestException(int code, String message) { + super(message, Response.status(code).entity(new ErrorData(message)).type(MediaType.APPLICATION_JSON).build()); + } + + public RestException(Throwable t) { + super(getResponse(t)); + } + + public RestException(PulsarAdminException cae) { + this(cae.getStatusCode(), cae.getHttpError()); + } + + private static Response getResponse(Throwable t) { + if (t instanceof RestException + || t instanceof WebApplicationException) { + WebApplicationException e = (WebApplicationException) t; + return e.getResponse(); + } else { + return Response + .status(Status.INTERNAL_SERVER_ERROR) + .entity(getExceptionData(t)) + .type(MediaType.TEXT_PLAIN) + .build(); + } + } +} \ No newline at end of file diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 627f5cc..474032e 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -105,6 +105,7 @@ import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.Utils; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.request.RequestResult; +import org.apache.pulsar.functions.worker.rest.RestException; import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.Source; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; @@ -802,33 +803,30 @@ public class FunctionsImpl { private final Gson gson = new GsonBuilder().setPrettyPrinting().create(); - public Response getFunctionStats(final String tenant, final String namespace, final String componentName, + public FunctionStats getFunctionStats(final String tenant, final String namespace, final String componentName, final String componentType, URI uri) throws IOException { if (!isWorkerServiceAvailable()) { - return getUnavailableResponse(); + throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while."); } // validate parameters try { validateGetFunctionRequestParams(tenant, namespace, componentName, componentType); } catch (IllegalArgumentException e) { - log.error("Invalid get {} Status request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); - return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(e.getMessage())).build(); + log.error("Invalid get {} Stats request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); + throw new RestException(Status.BAD_REQUEST, e.getMessage()); } FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { - log.error("{} in get {} Status does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName); - return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build(); + log.error("{} in get {} Stats does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName); + throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName)); } FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); if (!calculateSubjectType(functionMetaData).equals(componentType)) { log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType); - return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build(); + throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName)); } FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); @@ -838,18 +836,18 @@ public class FunctionsImpl { } catch (WebApplicationException we) { throw we; } catch (Exception e) { - log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, componentName, e); - return Response.status(Status.INTERNAL_SERVER_ERROR).entity(new ErrorData(e.getMessage())).build(); + log.error("{}/{}/{} Got Exception Getting Stats", tenant, namespace, componentName, e); + throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); } - return Response.status(Status.OK).entity(gson.toJson(functionStats)).build(); + return functionStats; } - public Response getFunctionsInstanceStats(final String tenant, final String namespace, final String componentName, + public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionsInstanceStats(final String tenant, final String namespace, final String componentName, final String componentType, String instanceId, URI uri) { if (!isWorkerServiceAvailable()) { - return getUnavailableResponse(); + throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while."); } // validate parameters @@ -857,27 +855,25 @@ public class FunctionsImpl { validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId); } catch (IllegalArgumentException e) { log.error("Invalid get {} Stats request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); - return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(e.getMessage())).build(); + throw new RestException(Status.BAD_REQUEST, e.getMessage()); + } FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { log.error("{} in get {} Stats does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName); - return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build(); + throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName)); } FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); if (!calculateSubjectType(functionMetaData).equals(componentType)) { log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType); - return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build(); + throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName)); + } int instanceIdInt = Integer.parseInt(instanceId); if (instanceIdInt < 0 || instanceIdInt >= functionMetaData.getFunctionDetails().getParallelism()) { log.error("instanceId in get {} Stats out of bounds @ /{}/{}/{}", componentType, tenant, namespace, componentName); - return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(String.format("Invalid InstanceId"))).build(); + throw new RestException(Status.BAD_REQUEST, String.format("%s %s doesn't have instance with id %s", componentType, componentName, instanceId)); } FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); @@ -889,10 +885,10 @@ public class FunctionsImpl { throw we; } catch (Exception e) { log.error("{}/{}/{} Got Exception Getting Stats", tenant, namespace, componentName, e); - return Response.status(Status.INTERNAL_SERVER_ERROR).entity(new ErrorData(e.getMessage())).build(); + throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); } - return Response.status(Status.OK).entity(gson.toJson(functionInstanceStatsData)).build(); + return functionInstanceStatsData; } public Response listFunctions(final String tenant, final String namespace, String componentType) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index 3412d2e..ea23d26 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -20,7 +20,6 @@ package org.apache.pulsar.functions.worker.rest.api.v2; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.policies.data.FunctionStats; -import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.worker.rest.FunctionApiResource; import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; @@ -28,7 +27,6 @@ import org.glassfish.jersey.media.multipart.FormDataParam; import org.apache.pulsar.common.io.ConnectorDefinition; import java.io.IOException; import java.io.InputStream; -import java.net.URI; import java.util.List; import javax.ws.rs.Consumes; @@ -131,7 +129,7 @@ public class FunctionApiV2Resource extends FunctionApiResource { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") }) @Path("/{tenant}/{namespace}/{functionName}/stats") - public Response getFunctionStats(final @PathParam("tenant") String tenant, + public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) throws IOException { return functions.getFunctionStats(tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri()); @@ -147,10 +145,10 @@ public class FunctionApiV2Resource extends FunctionApiResource { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") }) @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats") - public Response getFunctionInstanceStats(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName, - final @PathParam("instanceId") String instanceId) throws IOException { + public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName, + final @PathParam("instanceId") String instanceId) throws IOException { return functions.getFunctionsInstanceStats( tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri()); } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 20e4897..651e5d0 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.utils.Utils; import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -214,7 +215,7 @@ public class FunctionRuntimeManagerTest { functionRuntimeManager.processAssignment(assignment1); functionRuntimeManager.processAssignment(assignment2); - functionRuntimeManager.deleteAssignment(Utils.getFullyQualifiedInstanceId(assignment1.getInstance())); + functionRuntimeManager.deleteAssignment(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance())); verify(functionRuntimeManager, times(0)).setAssignment(any(Function.Assignment.class)); verify(functionRuntimeManager, times(1)).deleteAssignment(any(String.class)); @@ -406,11 +407,11 @@ public class FunctionRuntimeManagerTest { List<Message<byte[]>> messageList = new LinkedList<>(); Message message1 = spy(new MessageImpl("foo", MessageId.latest.toString(), new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null)); - doReturn(Utils.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey(); + doReturn(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey(); Message message2 = spy(new MessageImpl("foo", MessageId.latest.toString(), new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null)); - doReturn(Utils.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey(); + doReturn(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey(); // delete function2 Message message3 = spy(new MessageImpl("foo", MessageId.latest.toString(), diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java index 21a2852..efc2974 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java @@ -28,6 +28,7 @@ import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory; import org.apache.pulsar.functions.runtime.Runtime; import org.apache.pulsar.functions.runtime.RuntimeSpawner; +import org.apache.pulsar.functions.utils.Utils; import org.testng.Assert; import org.testng.annotations.Test; diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index ffd1bd6..465729e 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -34,6 +34,7 @@ import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory; import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; +import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler; import org.mockito.Mockito; import org.mockito.invocation.Invocation; @@ -154,7 +155,7 @@ public class SchedulerManagerTest { Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>(); Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>(); - assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); + assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); @@ -200,7 +201,7 @@ public class SchedulerManagerTest { Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>(); Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>(); - assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); + assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); @@ -247,7 +248,7 @@ public class SchedulerManagerTest { Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>(); Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>(); - assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); + assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); @@ -314,9 +315,9 @@ public class SchedulerManagerTest { Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>(); Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>(); - assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); + assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); //TODO: delete this assignment - assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2); + assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2); currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); @@ -373,7 +374,7 @@ public class SchedulerManagerTest { Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>(); Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>(); - assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); + assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); @@ -405,7 +406,7 @@ public class SchedulerManagerTest { Assert.assertEquals(assignments, assignment2); // updating assignments - currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2); + currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2); // scale up @@ -484,7 +485,7 @@ public class SchedulerManagerTest { Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>(); Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>(); - assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); + assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); @@ -538,9 +539,9 @@ public class SchedulerManagerTest { assertTrue(allAssignments.contains(assignment2_3)); // updating assignments - currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1); - currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2); - currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_3.getInstance()), assignment2_3); + currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1); + currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2); + currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_3.getInstance()), assignment2_3); // scale down @@ -665,7 +666,7 @@ public class SchedulerManagerTest { Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>(); Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>(); - assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); + assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); @@ -716,9 +717,9 @@ public class SchedulerManagerTest { assertTrue(allAssignments.contains(assignment2_3)); // updating assignments - currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1); - currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2); - currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_3.getInstance()), assignment2_3); + currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1); + currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2); + currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_3.getInstance()), assignment2_3); // update field @@ -805,7 +806,7 @@ public class SchedulerManagerTest { Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>(); Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>(); - assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); + assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); currentAssignments.put("worker-1", assignmentEntry1); Map<String, Function.Assignment> assignmentEntry2 = new HashMap<>(); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java index d1b24a2..b510030 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -72,10 +72,12 @@ public class PulsarConnectorCache { } public static void shutdown() throws ManagedLedgerException, InterruptedException { - if (instance != null) { - instance.managedLedgerFactory.shutdown(); - instance.statsProvider.stop(); - instance = null; + synchronized (PulsarConnectorCache.class) { + if (instance != null) { + instance.managedLedgerFactory.shutdown(); + instance.statsProvider.stop(); + instance = null; + } } } }