This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 00ea30d Enable worker JVM metrics to be reported via Prometheus (#8097) 00ea30d is described below commit 00ea30d90aa47bd5843b6a900c0abf3144a480e3 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Tue Sep 22 17:19:22 2020 -0700 Enable worker JVM metrics to be reported via Prometheus (#8097) Co-authored-by: Jerry Peng <jer...@splunk.com> --- .../org/apache/pulsar/functions/worker/Worker.java | 2 +- .../pulsar/functions/worker/WorkerService.java | 10 ++++++--- .../functions/worker/WorkerStatsManager.java | 26 +++++++++++++++++++++- 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java index 968b8b4..239fb27 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java @@ -62,7 +62,7 @@ public class Worker { public Worker(WorkerConfig workerConfig) { this.workerConfig = workerConfig; - this.workerService = new WorkerService(workerConfig); + this.workerService = new WorkerService(workerConfig, true); this.errorNotifier = ErrorNotifier.getDefaultImpl(); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index c924e80..4b7e23c 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -76,12 +76,16 @@ public class WorkerService { private FunctionAssignmentTailer functionAssignmentTailer; private final WorkerStatsManager workerStatsManager; - public WorkerService(WorkerConfig workerConfig) { + public WorkerService(WorkerConfig workerConfig, boolean runAsStandalone) { this.workerConfig = workerConfig; this.statsUpdater = Executors - .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-stats-updater")); + .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-stats-updater")); this.metricsGenerator = new MetricsGenerator(this.statsUpdater, workerConfig); - workerStatsManager = new WorkerStatsManager(workerConfig); + this.workerStatsManager = new WorkerStatsManager(workerConfig, runAsStandalone); + } + + public WorkerService(WorkerConfig workerConfig) { + this(workerConfig, false); } public void start(URI dlogUri, diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java index 5d48959..703b131 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java @@ -19,9 +19,13 @@ package org.apache.pulsar.functions.worker; +import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed; + +import io.netty.util.internal.PlatformDependent; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Gauge; import io.prometheus.client.Summary; +import io.prometheus.client.hotspot.DefaultExports; import lombok.Setter; import org.apache.pulsar.functions.instance.stats.PrometheusTextFormat; import org.apache.pulsar.functions.proto.Function; @@ -32,6 +36,10 @@ import java.util.List; public class WorkerStatsManager { + static { + DefaultExports.initialize(); + } + private static final String PULSAR_FUNCTION_WORKER_METRICS_PREFIX = "pulsar_function_worker_"; private static final String START_UP_TIME = "start_up_time_ms"; private static final String INSTANCE_COUNT = "instance_count"; @@ -79,7 +87,7 @@ public class WorkerStatsManager { private final Summary.Child _stopInstanceProcessTime; private final Summary.Child _startInstanceProcessTime; - public WorkerStatsManager(WorkerConfig workerConfig) { + public WorkerStatsManager(WorkerConfig workerConfig, boolean runAsStandalone) { metricsLabels = new String[]{workerConfig.getPulsarFunctionsCluster()}; @@ -156,6 +164,22 @@ public class WorkerStatsManager { .quantile(1, 0.01) .register(collectorRegistry); _startInstanceProcessTime = startInstanceProcessTime.labels(metricsLabels); + + if (runAsStandalone) { + Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new Gauge.Child() { + @Override + public double get() { + return getJvmDirectMemoryUsed(); + } + }).register(CollectorRegistry.defaultRegistry); + + Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Gauge.Child() { + @Override + public double get() { + return PlatformDependent.maxDirectMemory(); + } + }).register(CollectorRegistry.defaultRegistry); + } } private Long startupTimeStart;