This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 8d24102 prometheus metrics for functions served via brokers or function instances should match (#3066) 8d24102 is described below commit 8d241022724fff411f6841ff84b970f01d5dbd18 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Tue Nov 27 14:49:01 2018 -0800 prometheus metrics for functions served via brokers or function instances should match (#3066) * prometheus metrics for functions served via brokers or instances themselves should match * add additional testing --- .../apache/pulsar/io/PulsarFunctionE2ETest.java | 248 +++++++++++++++++++-- .../functions/instance/FunctionStatsManager.java | 15 ++ .../functions/instance/JavaInstanceRunnable.java | 7 +- .../functions/runtime/KubernetesRuntime.java | 7 + .../pulsar/functions/runtime/ProcessRuntime.java | 10 +- .../apache/pulsar/functions/runtime/Runtime.java | 2 + .../pulsar/functions/runtime/RuntimeUtils.java | 21 ++ .../pulsar/functions/runtime/ThreadRuntime.java | 6 + .../functions/worker/FunctionsStatsGenerator.java | 43 +--- .../worker/FunctionStatsGeneratorTest.java | 220 ------------------ 10 files changed, 294 insertions(+), 285 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index 9214d25..f2869e6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -18,28 +18,9 @@ */ package org.apache.pulsar.io; -import static org.apache.commons.lang3.StringUtils.isNotBlank; -import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; -import static org.mockito.Mockito.spy; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.assertTrue; - import com.google.common.collect.Lists; import com.google.common.collect.Sets; - -import java.lang.reflect.Method; -import java.net.InetAddress; -import java.net.URL; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.ToString; import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -65,6 +46,7 @@ import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; +import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; @@ -76,6 +58,33 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.Method; +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.URL; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; + /** * Test Pulsar sink on function * @@ -391,6 +400,79 @@ public class PulsarFunctionE2ETest { assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(), functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency()); assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(), functionStats.getAvgProcessLatency()); + // validate prometheus metrics empty + String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort); + log.info("prometheus metrics: {}", prometheusMetrics); + + Map<String, Metric> metrics = parseMetrics(prometheusMetrics); + Metric m = metrics.get("pulsar_function_received_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_function_received_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_function_user_exceptions_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_function_user_exceptions_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_function_process_latency_ms"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.value, Double.NaN); + m = metrics.get("pulsar_function_process_latency_ms_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.value, Double.NaN); + m = metrics.get("pulsar_function_system_exceptions_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_function_system_exceptions_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_function_last_invocation"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_function_processed_successfully_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_function_processed_successfully_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.value, 0.0); + + // validate function instance stats empty FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStats = functionRuntimeManager.getFunctionInstanceStats(tenant, namespacePortion, functionName, 0, null); @@ -464,6 +546,78 @@ public class PulsarFunctionE2ETest { assertEquals(functionInstanceStats, functionInstanceStatsAdmin); assertEquals(functionInstanceStats, functionStats.instances.get(0).getMetrics()); + + // validate prometheus metrics + prometheusMetrics = getPrometheusMetrics(brokerWebServicePort); + log.info("prometheus metrics: {}", prometheusMetrics); + + metrics = parseMetrics(prometheusMetrics); + m = metrics.get("pulsar_function_received_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.value, (double) totalMsgs); + m = metrics.get("pulsar_function_received_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.value, (double) totalMsgs); + m = metrics.get("pulsar_function_user_exceptions_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_function_user_exceptions_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_function_process_latency_ms"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertTrue(m.value > 0.0); + m = metrics.get("pulsar_function_process_latency_ms_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertTrue(m.value > 0.0); + m = metrics.get("pulsar_function_system_exceptions_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_function_system_exceptions_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_function_last_invocation"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertTrue(m.value > 0.0); + m = metrics.get("pulsar_function_processed_successfully_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.value, (double) totalMsgs); + m = metrics.get("pulsar_function_processed_successfully_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.value, (double) totalMsgs); } @Test(timeOut = 20000) @@ -628,4 +782,58 @@ public class PulsarFunctionE2ETest { producer.close(); } + + public static String getPrometheusMetrics(int metricsPort) throws IOException { + StringBuilder result = new StringBuilder(); + URL url = new URL(String.format("http://%s:%s/metrics", InetAddress.getLocalHost().getHostAddress(), metricsPort)); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("GET"); + BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream())); + String line; + while ((line = rd.readLine()) != null) { + result.append(line + System.lineSeparator()); + } + rd.close(); + return result.toString(); + } + + /** + * Hacky parsing of Prometheus text format. Sould be good enough for unit tests + */ + private static Map<String, Metric> parseMetrics(String metrics) { + Map<String, Metric> parsed = new HashMap<>(); + // Example of lines are + // jvm_threads_current{cluster="standalone",} 203.0 + // or + // pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1", + // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897 + Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.]+)(\\s(\\d+))?$"); + Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?"); + Arrays.asList(metrics.split("\n")).forEach(line -> { + if (line.isEmpty() || line.startsWith("#")) { + return; + } + Matcher matcher = pattern.matcher(line); + checkArgument(matcher.matches()); + String name = matcher.group(1); + Metric m = new Metric(); + m.value = Double.valueOf(matcher.group(3)); + String tags = matcher.group(2); + Matcher tagsMatcher = tagsPattern.matcher(tags); + while (tagsMatcher.find()) { + String tag = tagsMatcher.group(1); + String value = tagsMatcher.group(2); + m.tags.put(tag, value); + } + parsed.put(name, m); + }); + return parsed; + } + + @ToString + static class Metric { + Map<String, String> tags = new TreeMap<>(); + double value; + } + } \ No newline at end of file diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java index 139208b..c1b7574 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java @@ -23,11 +23,14 @@ import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Counter; import io.prometheus.client.Gauge; import io.prometheus.client.Summary; +import io.prometheus.client.exporter.common.TextFormat; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.proto.InstanceCommunication; +import java.io.IOException; +import java.io.StringWriter; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -89,6 +92,8 @@ public class FunctionStatsManager implements AutoCloseable { private ScheduledFuture<?> scheduledFuture; + private final CollectorRegistry collectorRegistry; + @Getter private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestUserExceptions = EvictingQueue.create(10); @Getter @@ -96,6 +101,8 @@ public class FunctionStatsManager implements AutoCloseable { public FunctionStatsManager(CollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService scheduledExecutorService) { + this.collectorRegistry = collectorRegistry; + this.metricsLabels = metricsLabels; statTotalProcessedSuccessfully = Counter.build() @@ -326,6 +333,14 @@ public class FunctionStatsManager implements AutoCloseable { latestSystemExceptions.clear(); } + public String getStatsAsString() throws IOException { + StringWriter outputWriter = new StringWriter(); + + TextFormat.write004(outputWriter, collectorRegistry.metricFamilySamples()); + + return outputWriter.toString(); + } + @Override public void close() { scheduledFuture.cancel(false); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 1d70356..a9e9177 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -24,7 +24,6 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import io.netty.buffer.ByteBuf; import io.prometheus.client.CollectorRegistry; -import java.util.concurrent.TimeUnit; import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -38,11 +37,8 @@ import org.apache.bookkeeper.clients.exceptions.ClientException; import org.apache.bookkeeper.clients.exceptions.InternalServerException; import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException; import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException; -import org.apache.bookkeeper.common.util.Backoff; import org.apache.bookkeeper.common.util.Backoff.Jitter; import org.apache.bookkeeper.common.util.Backoff.Jitter.Type; -import org.apache.bookkeeper.common.util.Backoff.Policy; -import org.apache.bookkeeper.common.util.Retries; import org.apache.bookkeeper.stream.proto.NamespaceConfiguration; import org.apache.bookkeeper.stream.proto.StorageType; import org.apache.bookkeeper.stream.proto.StreamConfiguration; @@ -69,7 +65,6 @@ import org.apache.pulsar.functions.sink.PulsarSinkConfig; import org.apache.pulsar.functions.sink.PulsarSinkDisable; import org.apache.pulsar.functions.source.PulsarSource; import org.apache.pulsar.functions.source.PulsarSourceConfig; -import org.apache.pulsar.functions.utils.FunctionConfigUtils; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.StateUtils; @@ -86,6 +81,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF; @@ -119,6 +115,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private Throwable deathException; // function stats + @Getter private FunctionStatsManager stats; private Record<?> currentRecord; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index 482943b..fd6c4c4 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -47,6 +47,8 @@ import io.kubernetes.client.models.V1ServiceSpec; import io.kubernetes.client.models.V1StatefulSet; import io.kubernetes.client.models.V1StatefulSetSpec; import io.kubernetes.client.models.V1Toleration; + +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -300,6 +302,11 @@ class KubernetesRuntime implements Runtime { } @Override + public String getPrometheusMetrics() throws IOException { + return RuntimeUtils.getPrometheusMetrics(METRICS_PORT); + } + + @Override public boolean isAlive() { return running; } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index e39c7ad..4edac17 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -38,6 +38,7 @@ import org.apache.pulsar.functions.proto.InstanceControlGrpc; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import org.apache.pulsar.functions.utils.Utils; +import java.io.IOException; import java.io.InputStream; import java.util.List; import java.util.TimerTask; @@ -58,6 +59,7 @@ class ProcessRuntime implements Runtime { @Getter private List<String> processArgs; private int instancePort; + private int metricsPort; @Getter private Throwable deathException; private ManagedChannel channel; @@ -81,6 +83,7 @@ class ProcessRuntime implements Runtime { Long expectedHealthCheckInterval) throws Exception { this.instanceConfig = instanceConfig; this.instancePort = instanceConfig.getPort(); + this.metricsPort = Utils.findAvailablePort(); this.expectedHealthCheckInterval = expectedHealthCheckInterval; this.secretsProviderConfigurator = secretsProviderConfigurator; String logConfigFile = null; @@ -119,7 +122,7 @@ class ProcessRuntime implements Runtime { false, null, null, - Utils.findAvailablePort()); + this.metricsPort); } /** @@ -268,6 +271,11 @@ class ProcessRuntime implements Runtime { return retval; } + @Override + public String getPrometheusMetrics() throws IOException { + return RuntimeUtils.getPrometheusMetrics(metricsPort); + } + public CompletableFuture<InstanceCommunication.HealthCheckResult> healthCheck() { CompletableFuture<InstanceCommunication.HealthCheckResult> retval = new CompletableFuture<>(); if (stub == null) { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java index ac1eced..fafdca7 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java @@ -21,6 +21,7 @@ package org.apache.pulsar.functions.runtime; import org.apache.pulsar.functions.proto.InstanceCommunication; +import java.io.IOException; import java.util.concurrent.CompletableFuture; /** @@ -46,4 +47,5 @@ public interface Runtime { CompletableFuture<InstanceCommunication.MetricsData> getMetrics(); + String getPrometheusMetrics() throws IOException; } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 05f21e6..a5f55c5 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -20,6 +20,13 @@ package org.apache.pulsar.functions.runtime; import com.google.protobuf.util.JsonFormat; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.URL; import java.util.LinkedList; import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -186,4 +193,18 @@ class RuntimeUtils { args.add(instanceConfig.getClusterName()); return args; } + + public static String getPrometheusMetrics(int metricsPort) throws IOException{ + StringBuilder result = new StringBuilder(); + URL url = new URL(String.format("http://%s:%s", InetAddress.getLocalHost().getHostAddress(), metricsPort)); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("GET"); + BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream())); + String line; + while ((line = rd.readLine()) != null) { + result.append(line + System.lineSeparator()); + } + rd.close(); + return result.toString(); + } } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java index 460cdb0..be049c3 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.runtime; +import java.io.IOException; import java.util.concurrent.CompletableFuture; import io.prometheus.client.CollectorRegistry; @@ -157,6 +158,11 @@ class ThreadRuntime implements Runtime { } @Override + public String getPrometheusMetrics() throws IOException { + return javaInstanceRunnable.getStats().getStatsAsString(); + } + + @Override public CompletableFuture<Void> resetMetrics() { javaInstanceRunnable.resetMetrics(); return CompletableFuture.completedFuture(null); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java index 2b8e0fd..cd186cc 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java @@ -18,17 +18,15 @@ */ package org.apache.pulsar.functions.worker; -import org.apache.pulsar.functions.instance.FunctionStatsManager; -import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.common.util.SimpleTextOutputStream; import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory; import org.apache.pulsar.functions.runtime.Runtime; import org.apache.pulsar.functions.runtime.RuntimeSpawner; -import org.apache.pulsar.common.util.SimpleTextOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Map; -import java.util.concurrent.ExecutionException; /** * A class to generate stats for pulsar functions running on this broker @@ -57,31 +55,10 @@ public class FunctionsStatsGenerator { Runtime functionRuntime = functionRuntimeSpawner.getRuntime(); if (functionRuntime != null) { try { - InstanceCommunication.MetricsData metrics = functionRuntime.getMetrics().get(); - - String tenant = functionRuntimeInfo.getFunctionInstance() - .getFunctionMetaData().getFunctionDetails().getTenant(); - String namespace = functionRuntimeInfo.getFunctionInstance() - .getFunctionMetaData().getFunctionDetails().getNamespace(); - String name = functionRuntimeInfo.getFunctionInstance() - .getFunctionMetaData().getFunctionDetails().getName(); - int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); - String qualifiedNamespace = String.format("%s/%s", tenant, namespace); - - metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.PROCESS_LATENCY_MS, instanceId, metrics.getAvgProcessLatency()); - metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.LAST_INVOCATION, instanceId, metrics.getLastInvocation()); - metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.PROCESSED_SUCCESSFULLY_TOTAL, instanceId, metrics.getProcessedSuccessfullyTotal()); - metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.RECEIVED_TOTAL, instanceId, metrics.getReceivedTotal()); - metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.SYSTEM_EXCEPTIONS_TOTAL, instanceId, metrics.getSystemExceptionsTotal()); - metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.USER_EXCEPTIONS_TOTAL, instanceId, metrics.getUserExceptionsTotal()); - for (Map.Entry<String, Double> userMetricsMapEntry : metrics.getUserMetricsMap().entrySet()) { - String userMetricName = userMetricsMapEntry.getKey(); - Double val = userMetricsMapEntry.getValue(); - metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + userMetricName, instanceId, val); - } + out.write(functionRuntime.getPrometheusMetrics()); - } catch (InterruptedException | ExecutionException e) { + } catch (IOException e) { log.warn("Failed to collect metrics for function instance {}", fullyQualifiedInstanceName, e); } @@ -90,16 +67,4 @@ public class FunctionsStatsGenerator { } } } - - private static void metricType(SimpleTextOutputStream stream, String name) { - stream.write("# TYPE ").write(name).write(" gauge\n"); - } - - private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, - String functionName, String metricName, int instanceId, double value) { - metricType(stream, metricName); - stream.write(metricName).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace) - .write("\",name=\"").write(functionName).write("\",instanceId=\"").write(instanceId).write("\"} "); - stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); - } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java deleted file mode 100644 index 4b54bcf..0000000 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java +++ /dev/null @@ -1,220 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.worker; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.Unpooled; -import lombok.ToString; -import org.apache.pulsar.common.util.SimpleTextOutputStream; -import org.apache.pulsar.functions.proto.Function; -import org.apache.pulsar.functions.proto.InstanceCommunication; -import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory; -import org.apache.pulsar.functions.runtime.Runtime; -import org.apache.pulsar.functions.runtime.RuntimeSpawner; -import org.apache.pulsar.functions.utils.Utils; -import org.testng.Assert; -import org.testng.annotations.Test; - -import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.CompletableFuture; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static com.google.common.base.Preconditions.checkArgument; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; - -public class FunctionStatsGeneratorTest { - - @Test - public void testGenerateFunctionStatsWhenWorkerServiceIsNotInitialized() { - WorkerService workerService = mock(WorkerService.class); - when(workerService.isInitialized()).thenReturn(false); - FunctionsStatsGenerator.generate( - workerService, "test-cluster", new SimpleTextOutputStream(Unpooled.buffer())); - verify(workerService, times(1)).isInitialized(); - verify(workerService, times(0)).getFunctionRuntimeManager(); - } - - @Test - public void testGenerateFunctionStatsOnK8SRuntimeFactory() { - WorkerService workerService = mock(WorkerService.class); - when(workerService.isInitialized()).thenReturn(true); - FunctionRuntimeManager frm = mock(FunctionRuntimeManager.class); - when(frm.getRuntimeFactory()).thenReturn(mock(KubernetesRuntimeFactory.class)); - when(workerService.getFunctionRuntimeManager()).thenReturn(frm); - FunctionsStatsGenerator.generate( - workerService, "test-cluster", new SimpleTextOutputStream(Unpooled.buffer())); - verify(workerService, times(1)).isInitialized(); - verify(workerService, times(1)).getFunctionRuntimeManager(); - verify(frm, times(0)).getFunctionRuntimeInfos(); - } - - @Test - public void testFunctionsStatsGenerate() { - FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class); - Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new HashMap<>(); - - WorkerService workerService = mock(WorkerService.class); - doReturn(functionRuntimeManager).when(workerService).getFunctionRuntimeManager(); - doReturn(new WorkerConfig()).when(workerService).getWorkerConfig(); - when(workerService.isInitialized()).thenReturn(true); - - CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture = new CompletableFuture<>(); - InstanceCommunication.MetricsData metricsData = InstanceCommunication.MetricsData.newBuilder() - .setReceivedTotal(101) - .setProcessedSuccessfullyTotal(99) - .setAvgProcessLatency(10.0) - .setUserExceptionsTotal(3) - .setSystemExceptionsTotal(1) - .setLastInvocation(1542324900) - .build(); - - metricsDataCompletableFuture.complete(metricsData); - Runtime runtime = mock(Runtime.class); - doReturn(metricsDataCompletableFuture).when(runtime).getMetrics(); - - RuntimeSpawner runtimeSpawner = mock(RuntimeSpawner.class); - doReturn(runtime).when(runtimeSpawner).getRuntime(); - - Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( - Function.FunctionDetails.newBuilder() - .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build(); - - Function.Instance instance = Function.Instance.newBuilder() - .setFunctionMetaData(function1).setInstanceId(0).build(); - - FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class); - doReturn(runtimeSpawner).when(functionRuntimeInfo).getRuntimeSpawner(); - doReturn(instance).when(functionRuntimeInfo).getFunctionInstance(); - - functionRuntimeInfoMap.put(Utils.getFullyQualifiedInstanceId(instance), functionRuntimeInfo); - doReturn(functionRuntimeInfoMap).when(functionRuntimeManager).getFunctionRuntimeInfos(); - - ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(); - SimpleTextOutputStream statsOut = new SimpleTextOutputStream(buf); - FunctionsStatsGenerator.generate(workerService, "default", statsOut); - - String str = buf.toString(Charset.defaultCharset()); - - buf.release(); - Map<String, Metric> metrics = parseMetrics(str); - - Assert.assertEquals(metrics.size(), 6); - - System.out.println("metrics: " + metrics); - Metric m = metrics.get("pulsar_function_received_total"); - assertEquals(m.tags.get("cluster"), "default"); - assertEquals(m.tags.get("instanceId"), "0"); - assertEquals(m.tags.get("name"), "func-1"); - assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); - assertEquals(m.value, 101.0); - - m = metrics.get("pulsar_function_user_exceptions_total"); - assertEquals(m.tags.get("cluster"), "default"); - assertEquals(m.tags.get("instanceId"), "0"); - assertEquals(m.tags.get("name"), "func-1"); - assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); - assertEquals(m.value, 3.0); - - m = metrics.get("pulsar_function_process_latency_ms"); - assertEquals(m.tags.get("cluster"), "default"); - assertEquals(m.tags.get("instanceId"), "0"); - assertEquals(m.tags.get("name"), "func-1"); - assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); - assertEquals(m.value, 10.0); - - m = metrics.get("pulsar_function_system_exceptions_total"); - assertEquals(m.tags.get("cluster"), "default"); - assertEquals(m.tags.get("instanceId"), "0"); - assertEquals(m.tags.get("name"), "func-1"); - assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); - assertEquals(m.value, 1.0); - - m = metrics.get("pulsar_function_last_invocation"); - assertEquals(m.tags.get("cluster"), "default"); - assertEquals(m.tags.get("instanceId"), "0"); - assertEquals(m.tags.get("name"), "func-1"); - assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); - assertEquals(m.value, 1542324900.0); - - m = metrics.get("pulsar_function_processed_successfully_total"); - assertEquals(m.tags.get("cluster"), "default"); - assertEquals(m.tags.get("instanceId"), "0"); - assertEquals(m.tags.get("name"), "func-1"); - assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); - assertEquals(m.value, 99.0); - } - - /** - * Hacky parsing of Prometheus text format. Sould be good enough for unit tests - */ - private static Map<String, Metric> parseMetrics(String metrics) { - Map<String, Metric> parsed = new HashMap<>(); - - // Example of lines are - // jvm_threads_current{cluster="standalone",} 203.0 - // or - // pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1", - // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897 - Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.]+)(\\s(\\d+))?$"); - Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?"); - - Arrays.asList(metrics.split("\n")).forEach(line -> { - if (line.isEmpty() || line.startsWith("#")) { - return; - } - Matcher matcher = pattern.matcher(line); - - checkArgument(matcher.matches()); - String name = matcher.group(1); - - Metric m = new Metric(); - m.value = Double.valueOf(matcher.group(3)); - - String tags = matcher.group(2); - Matcher tagsMatcher = tagsPattern.matcher(tags); - while (tagsMatcher.find()) { - String tag = tagsMatcher.group(1); - String value = tagsMatcher.group(2); - m.tags.put(tag, value); - } - - parsed.put(name, m); - }); - - return parsed; - } - - @ToString - static class Metric { - Map<String, String> tags = new TreeMap<>(); - double value; - } - -}