This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 767c0d9 Add additional metrics for Pulsar Function Worker (#7685) 767c0d9 is described below commit 767c0d967ff12fdc62ae4ef58ff91b44e137187b Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Wed Jul 29 16:49:05 2020 -0700 Add additional metrics for Pulsar Function Worker (#7685) Co-authored-by: Jerry Peng <jer...@splunk.com> --- .../pulsar/functions/worker/WorkerService.java | 2 + .../functions/worker/WorkerStatsManager.java | 71 ++++++++++++++++++---- 2 files changed, 60 insertions(+), 13 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 94d3fbd..c924e80 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -284,6 +284,8 @@ public class WorkerService { log.info("/** Started worker id={} **/", workerConfig.getWorkerId()); workerStatsManager.setFunctionRuntimeManager(functionRuntimeManager); + workerStatsManager.setFunctionMetaDataManager(functionMetaDataManager); + workerStatsManager.setLeaderService(leaderService); workerStatsManager.startupTimeEnd(); } catch (Throwable t) { log.error("Error Starting up in worker", t); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java index a14039e..5d48959 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java @@ -24,22 +24,27 @@ import io.prometheus.client.Gauge; import io.prometheus.client.Summary; import lombok.Setter; import org.apache.pulsar.functions.instance.stats.PrometheusTextFormat; +import org.apache.pulsar.functions.proto.Function; import java.io.IOException; import java.io.StringWriter; +import java.util.List; public class WorkerStatsManager { private static final String PULSAR_FUNCTION_WORKER_METRICS_PREFIX = "pulsar_function_worker_"; private static final String START_UP_TIME = "start_up_time_ms"; private static final String INSTANCE_COUNT = "instance_count"; + private static final String TOTAL_EXPECTED_INSTANCE_COUNT = "total_expected_instance_count"; + private static final String TOTAL_FUNCTIONS_COUNT = "total_function_count"; private static final String SCHEDULE_TOTAL_EXEC_TIME = "schedule_execution_time_total_ms"; private static final String SCHEDULE_STRATEGY_EXEC_TIME = "schedule_strategy_execution_time_ms"; private static final String REBALANCE_TOTAL_EXEC_TIME = "rebalance_execution_time_total_ms"; private static final String REBALANCE_STRATEGY_EXEC_TIME = "rebalance_strategy_execution_time_ms"; private static final String STOPPING_INSTANCE_PROCESS_TIME = "stop_instance_process_time_ms"; - private static final String UPDATING_INSTANCE_PROCESS_TIME = "update_instance_process_time_ms"; private static final String STARTING_INSTANCE_PROCESS_TIME = "start_instance_process_time_ms"; + private static final String IS_LEADER = "is_leader"; + private static final String[] metricsLabelNames = {"cluster"}; private final String[] metricsLabels; @@ -47,6 +52,12 @@ public class WorkerStatsManager { @Setter private FunctionRuntimeManager functionRuntimeManager; + @Setter + private FunctionMetaDataManager functionMetaDataManager; + + @Setter + private LeaderService leaderService; + private CollectorRegistry collectorRegistry = new CollectorRegistry(); private final Summary statWorkerStartupTime; @@ -92,8 +103,7 @@ public class WorkerStatsManager { .labelNames(metricsLabelNames) .quantile(0.5, 0.01) .quantile(0.9, 0.01) - .quantile(0.99, 0.01) - .quantile(0.999, 0.01) + .quantile(1, 0.01) .register(collectorRegistry); _scheduleTotalExecutionTime = scheduleTotalExecutionTime.labels(metricsLabels); @@ -103,8 +113,7 @@ public class WorkerStatsManager { .labelNames(metricsLabelNames) .quantile(0.5, 0.01) .quantile(0.9, 0.01) - .quantile(0.99, 0.01) - .quantile(0.999, 0.01) + .quantile(1, 0.01) .register(collectorRegistry); _scheduleStrategyExecutionTime = scheduleStrategyExecutionTime.labels(metricsLabels); @@ -114,8 +123,7 @@ public class WorkerStatsManager { .labelNames(metricsLabelNames) .quantile(0.5, 0.01) .quantile(0.9, 0.01) - .quantile(0.99, 0.01) - .quantile(0.999, 0.01) + .quantile(1, 0.01) .register(collectorRegistry); _rebalanceTotalExecutionTime = rebalanceTotalExecutionTime.labels(metricsLabels); @@ -125,8 +133,7 @@ public class WorkerStatsManager { .labelNames(metricsLabelNames) .quantile(0.5, 0.01) .quantile(0.9, 0.01) - .quantile(0.99, 0.01) - .quantile(0.999, 0.01) + .quantile(1, 0.01) .register(collectorRegistry); _rebalanceStrategyExecutionTime = rebalanceStrategyExecutionTime.labels(metricsLabels); @@ -136,8 +143,7 @@ public class WorkerStatsManager { .labelNames(metricsLabelNames) .quantile(0.5, 0.01) .quantile(0.9, 0.01) - .quantile(0.99, 0.01) - .quantile(0.999, 0.01) + .quantile(1, 0.01) .register(collectorRegistry); _stopInstanceProcessTime = stopInstanceProcessTime.labels(metricsLabels); @@ -147,8 +153,7 @@ public class WorkerStatsManager { .labelNames(metricsLabelNames) .quantile(0.5, 0.01) .quantile(0.9, 0.01) - .quantile(0.99, 0.01) - .quantile(0.999, 0.01) + .quantile(1, 0.01) .register(collectorRegistry); _startInstanceProcessTime = startInstanceProcessTime.labels(metricsLabels); } @@ -245,6 +250,46 @@ public class WorkerStatsManager { PrometheusTextFormat.write004(outputWriter, collectorRegistry.metricFamilySamples()); + generateLeaderMetrics(outputWriter); return outputWriter.toString(); } + + private void generateLeaderMetrics(StringWriter stream) { + if (leaderService.isLeader()) { + + List<Function.FunctionMetaData> metadata = functionMetaDataManager.getAllFunctionMetaData(); + // get total number functions + long totalFunctions = metadata.size(); + writeMetric(TOTAL_FUNCTIONS_COUNT, totalFunctions, stream); + + // get total expected number of instances + long totalInstances = 0; + for (Function.FunctionMetaData entry : metadata) { + totalInstances += entry.getFunctionDetails().getParallelism(); + } + writeMetric(TOTAL_EXPECTED_INSTANCE_COUNT, totalInstances, stream); + + // is this worker is the leader + writeMetric(IS_LEADER, 1, stream); + } + } + + private void writeMetric(String metricName, long value, StringWriter stream) { + stream.write(PULSAR_FUNCTION_WORKER_METRICS_PREFIX); + stream.write(metricName); + stream.write("{"); + + for (int i = 0; i < metricsLabelNames.length; i++) { + stream.write(metricsLabelNames[i]); + stream.write('='); + stream.write('\"'); + stream.write(metricsLabels[i]); + stream.write("\","); + } + stream.write('}'); + + stream.write(' '); + stream.write(String.valueOf(value)); + stream.write('\n'); + } }