This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dd1b57944b1 [feat][misc] PIP-264: Copy OpenTelemetry resource 
attributes to Prometheus labels (#23005)
dd1b57944b1 is described below

commit dd1b57944b117d16ebd371996b44c02af2ce325c
Author: Dragos Misca <dragosvic...@users.noreply.github.com>
AuthorDate: Fri Jul 5 00:55:06 2024 -0700

    [feat][misc] PIP-264: Copy OpenTelemetry resource attributes to Prometheus 
labels (#23005)
---
 .../pulsar/opentelemetry/OpenTelemetryService.java | 15 +++++++++
 .../metrics/OpenTelemetrySanityTest.java           | 39 ++++++++++++----------
 2 files changed, 37 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java
 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java
index b32d353eb5a..e6c6d95273e 100644
--- 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java
+++ 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.opentelemetry;
 import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.annotations.VisibleForTesting;
 import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
 import io.opentelemetry.instrumentation.runtimemetrics.java17.RuntimeMetrics;
 import io.opentelemetry.sdk.OpenTelemetrySdk;
 import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
@@ -97,6 +98,20 @@ public class OpenTelemetryService implements Closeable {
                     return resource.merge(resourceBuilder.build());
                 });
 
+        sdkBuilder.addMetricReaderCustomizer((metricReader, configProperties) 
-> {
+            if (metricReader instanceof PrometheusHttpServer 
prometheusHttpServer) {
+                // At this point, the server is already started. We need to 
close it and create a new one with the
+                // correct resource attributes filter.
+                prometheusHttpServer.close();
+
+                // Allow all resource attributes to be exposed.
+                return prometheusHttpServer.toBuilder()
+                        .setAllowedResourceAttributesFilter(s -> true)
+                        .build();
+            }
+            return metricReader;
+        });
+
         if (builderCustomizer != null) {
             builderCustomizer.accept(sdkBuilder);
         }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java
index 38afc1f127d..31e600f3aa8 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.tests.integration.metrics;
 
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.awaitility.Awaitility.waitAtMost;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -37,7 +39,6 @@ import 
org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
 import org.apache.pulsar.tests.integration.topologies.PulsarTestBase;
-import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
 public class OpenTelemetrySanityTest {
@@ -71,17 +72,17 @@ public class OpenTelemetrySanityTest {
         // TODO: Validate cluster name and service version are present once
         // https://github.com/open-telemetry/opentelemetry-java/issues/6108 is 
solved.
         var metricName = "queueSize_ratio"; // Sent automatically by the 
OpenTelemetry SDK.
-        Awaitility.waitAtMost(90, 
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() 
-> {
+        waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, 
TimeUnit.SECONDS).until(() -> {
             var metrics = getMetricsFromPrometheus(
                     openTelemetryCollectorContainer, 
OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
             return !metrics.findByNameAndLabels(metricName, "job", 
PulsarBrokerOpenTelemetry.SERVICE_NAME).isEmpty();
         });
-        Awaitility.waitAtMost(90, 
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() 
-> {
+        waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, 
TimeUnit.SECONDS).until(() -> {
             var metrics = getMetricsFromPrometheus(
                     openTelemetryCollectorContainer, 
OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
             return !metrics.findByNameAndLabels(metricName, "job", 
PulsarProxyOpenTelemetry.SERVICE_NAME).isEmpty();
         });
-        Awaitility.waitAtMost(90, 
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() 
-> {
+        waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, 
TimeUnit.SECONDS).until(() -> {
             var metrics = getMetricsFromPrometheus(
                     openTelemetryCollectorContainer, 
OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
             return !metrics.findByNameAndLabels(metricName, "job", 
PulsarWorkerOpenTelemetry.SERVICE_NAME).isEmpty();
@@ -120,30 +121,34 @@ public class OpenTelemetrySanityTest {
         pulsarCluster.start();
         pulsarCluster.setupFunctionWorkers(PulsarTestBase.randomName(), 
FunctionRuntimeType.PROCESS, 1);
 
-        var metricName = "target_info"; // Sent automatically by the 
OpenTelemetry SDK.
-        Awaitility.waitAtMost(90, 
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() 
-> {
-            var metrics = getMetricsFromPrometheus(pulsarCluster.getBroker(0), 
prometheusExporterPort);
-            return !metrics.findByNameAndLabels(metricName,
+        var targetInfoMetricName = "target_info"; // Sent automatically by the 
OpenTelemetry SDK.
+        var cpuCountMetricName = "jvm_cpu_count"; // Configured by the 
OpenTelemetryService.
+        waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, 
TimeUnit.SECONDS).untilAsserted(() -> {
+            var expectedMetrics = new String[] {targetInfoMetricName, 
cpuCountMetricName, "pulsar_broker_topic_producer_count"};
+            var actualMetrics = 
getMetricsFromPrometheus(pulsarCluster.getBroker(0), prometheusExporterPort);
+            assertThat(expectedMetrics).allMatch(expectedMetric -> 
!actualMetrics.findByNameAndLabels(expectedMetric,
                     Pair.of("pulsar_cluster", clusterName),
                     Pair.of("service_name", 
PulsarBrokerOpenTelemetry.SERVICE_NAME),
                     Pair.of("service_version", PulsarVersion.getVersion()),
-                    Pair.of("host_name", 
pulsarCluster.getBroker(0).getHostname())).isEmpty();
+                    Pair.of("host_name", 
pulsarCluster.getBroker(0).getHostname())).isEmpty());
         });
-        Awaitility.waitAtMost(90, 
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() 
-> {
-            var metrics = getMetricsFromPrometheus(pulsarCluster.getProxy(), 
prometheusExporterPort);
-            return !metrics.findByNameAndLabels(metricName,
+        waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, 
TimeUnit.SECONDS).untilAsserted(() -> {
+            var expectedMetrics = new String[] {targetInfoMetricName, 
cpuCountMetricName};
+            var actualMetrics = 
getMetricsFromPrometheus(pulsarCluster.getProxy(), prometheusExporterPort);
+            assertThat(expectedMetrics).allMatch(expectedMetric -> 
!actualMetrics.findByNameAndLabels(expectedMetric,
                     Pair.of("pulsar_cluster", clusterName),
                     Pair.of("service_name", 
PulsarProxyOpenTelemetry.SERVICE_NAME),
                     Pair.of("service_version", PulsarVersion.getVersion()),
-                    Pair.of("host_name", 
pulsarCluster.getProxy().getHostname())).isEmpty();
+                    Pair.of("host_name", 
pulsarCluster.getProxy().getHostname())).isEmpty());
         });
-        Awaitility.waitAtMost(90, 
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() 
-> {
-            var metrics = 
getMetricsFromPrometheus(pulsarCluster.getAnyWorker(), prometheusExporterPort);
-            return !metrics.findByNameAndLabels(metricName,
+        waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, 
TimeUnit.SECONDS).untilAsserted(() -> {
+            var expectedMetrics = new String[] {targetInfoMetricName, 
cpuCountMetricName};
+            var actualMetrics = 
getMetricsFromPrometheus(pulsarCluster.getAnyWorker(), prometheusExporterPort);
+            assertThat(expectedMetrics).allMatch(expectedMetric -> 
!actualMetrics.findByNameAndLabels(expectedMetric,
                     Pair.of("pulsar_cluster", clusterName),
                     Pair.of("service_name", 
PulsarWorkerOpenTelemetry.SERVICE_NAME),
                     Pair.of("service_version", PulsarVersion.getVersion()),
-                    Pair.of("host_name", 
pulsarCluster.getAnyWorker().getHostname())).isEmpty();
+                    Pair.of("host_name", 
pulsarCluster.getAnyWorker().getHostname())).isEmpty());
         });
     }
 

Reply via email to