This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 8957e353ded [feat][misc] PIP-320: Add OpenTelemetry scaffolding (#22010) 8957e353ded is described below commit 8957e353ded9ee24eccea349c7747da721d9e66a Author: Dragos Misca <dragosvic...@users.noreply.github.com> AuthorDate: Fri Feb 9 15:40:20 2024 -0800 [feat][misc] PIP-320: Add OpenTelemetry scaffolding (#22010) Co-authored-by: Matteo Merli <mme...@apache.org> --- .github/workflows/pulsar-ci.yaml | 3 + build/run_integration_group.sh | 4 + distribution/server/src/assemble/LICENSE.bin.txt | 28 +++ pom.xml | 31 ++++ .../ProxySaslAuthenticationTest.java | 1 + pulsar-broker-common/pom.xml | 18 ++ .../stats/prometheus/PrometheusMetricsClient.java | 0 pulsar-broker/pom.xml | 14 ++ .../org/apache/pulsar/broker/PulsarService.java | 6 + .../broker/stats/PulsarBrokerOpenTelemetry.java | 49 +++++ pulsar-functions/worker/pom.xml | 6 + .../worker/PulsarWorkerOpenTelemetry.java | 48 +++++ .../functions/worker/PulsarWorkerService.java | 6 + .../worker/FunctionAssignmentTailerTest.java | 5 + .../pom.xml | 88 ++++----- .../opentelemetry/OpenTelemetryAttributes.java | 32 ++++ .../pulsar/opentelemetry/OpenTelemetryService.java | 108 +++++++++++ .../apache/pulsar/opentelemetry/package-info.java | 24 +++ .../opentelemetry/OpenTelemetryServiceTest.java | 201 +++++++++++++++++++++ pulsar-proxy/pom.xml | 6 + .../apache/pulsar/proxy/server/ProxyService.java | 7 + .../proxy/stats/PulsarProxyOpenTelemetry.java | 49 +++++ .../extensions/SimpleProxyExtensionTestBase.java | 1 + .../server/AdminProxyHandlerKeystoreTLSTest.java | 1 + .../proxy/server/AuthedAdminProxyHandlerTest.java | 1 + .../proxy/server/ProxyAdditionalServletTest.java | 1 + .../ProxyAuthenticatedProducerConsumerTest.java | 1 + .../proxy/server/ProxyAuthenticationTest.java | 4 +- .../server/ProxyConnectionThrottlingTest.java | 1 + .../server/ProxyEnableHAProxyProtocolTest.java | 1 + .../proxy/server/ProxyForwardAuthDataTest.java | 4 +- .../server/ProxyKeyStoreTlsTransportTest.java | 1 + .../proxy/server/ProxyKeyStoreTlsWithAuthTest.java | 1 + .../server/ProxyKeyStoreTlsWithoutAuthTest.java | 1 + .../proxy/server/ProxyLookupThrottlingTest.java | 1 + .../pulsar/proxy/server/ProxyMutualTlsTest.java | 1 + .../pulsar/proxy/server/ProxyParserTest.java | 1 + .../pulsar/proxy/server/ProxyRefreshAuthTest.java | 4 +- .../proxy/server/ProxyRolesEnforcementTest.java | 4 +- .../ProxyServiceStarterDisableZeroCopyTest.java | 3 +- .../proxy/server/ProxyServiceStarterTest.java | 1 + .../proxy/server/ProxyServiceTlsStarterTest.java | 1 + .../apache/pulsar/proxy/server/ProxyStatsTest.java | 1 + .../proxy/server/ProxyStuckConnectionTest.java | 1 + .../org/apache/pulsar/proxy/server/ProxyTest.java | 1 + .../apache/pulsar/proxy/server/ProxyTlsTest.java | 1 + .../pulsar/proxy/server/ProxyTlsWithAuthTest.java | 1 + .../server/ProxyWithAuthorizationNegTest.java | 4 +- .../proxy/server/ProxyWithAuthorizationTest.java | 5 +- .../server/ProxyWithExtensibleLoadManagerTest.java | 1 + .../server/ProxyWithJwtAuthorizationTest.java | 4 +- .../server/ProxyWithoutServiceDiscoveryTest.java | 5 +- .../SuperUserAuthedAdminProxyHandlerTest.java | 1 + .../server/UnauthedAdminProxyHandlerTest.java | 1 + tests/integration/pom.xml | 20 +- .../OpenTelemetryCollectorContainer.java | 63 +++++++ .../integration/containers/PulsarContainer.java | 2 + .../metrics/OpenTelemetrySanityTest.java | 165 +++++++++++++++++ .../integration/topologies/PulsarCluster.java | 43 +++-- .../integration/topologies/PulsarClusterSpec.java | 17 ++ .../containers/otel-collector-config.yaml | 43 +++++ .../src/test/resources/pulsar-metrics.xml | 28 +++ tests/integration/src/test/resources/pulsar.xml | 1 + 63 files changed, 1100 insertions(+), 76 deletions(-) diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml index 7767beaa9aa..effeab90beb 100644 --- a/.github/workflows/pulsar-ci.yaml +++ b/.github/workflows/pulsar-ci.yaml @@ -589,6 +589,9 @@ jobs: - name: Transaction group: TRANSACTION + - name: Metrics + group: METRICS + steps: - name: checkout uses: actions/checkout@v4 diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh index f20a7ad0793..2d82fce0887 100755 --- a/build/run_integration_group.sh +++ b/build/run_integration_group.sh @@ -181,6 +181,10 @@ test_group_transaction() { mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-transaction.xml -DintegrationTests } +test_group_metrics() { + mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-metrics.xml -DintegrationTests +} + test_group_tiered_filesystem() { mvn_run_integration_test "$@" -DintegrationTestSuiteFile=tiered-filesystem-storage.xml -DintegrationTests } diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index e3941c54a74..7c95811faf7 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -333,6 +333,13 @@ The Apache Software License, Version 2.0 - io.prometheus-simpleclient_tracer_common-0.16.0.jar - io.prometheus-simpleclient_tracer_otel-0.16.0.jar - io.prometheus-simpleclient_tracer_otel_agent-0.16.0.jar + * Prometheus exporter + - io.prometheus-prometheus-metrics-config-1.1.0.jar + - io.prometheus-prometheus-metrics-exporter-common-1.1.0.jar + - io.prometheus-prometheus-metrics-exporter-httpserver-1.1.0.jar + - io.prometheus-prometheus-metrics-exposition-formats-1.1.0.jar + - io.prometheus-prometheus-metrics-model-1.1.0.jar + - io.prometheus-prometheus-metrics-shaded-protobuf-1.1.0.jar * Jakarta Bean Validation API - jakarta.validation-jakarta.validation-api-2.0.2.jar - javax.validation-validation-api-1.1.0.Final.jar @@ -503,6 +510,27 @@ The Apache Software License, Version 2.0 * RoaringBitmap - org.roaringbitmap-RoaringBitmap-0.9.44.jar - org.roaringbitmap-shims-0.9.44.jar + * OpenTelemetry + - io.opentelemetry-opentelemetry-api-1.34.1.jar + - io.opentelemetry-opentelemetry-api-events-1.34.1-alpha.jar + - io.opentelemetry-opentelemetry-context-1.34.1.jar + - io.opentelemetry-opentelemetry-exporter-common-1.34.1.jar + - io.opentelemetry-opentelemetry-exporter-otlp-1.34.1.jar + - io.opentelemetry-opentelemetry-exporter-otlp-common-1.34.1.jar + - io.opentelemetry-opentelemetry-exporter-prometheus-1.34.1-alpha.jar + - io.opentelemetry-opentelemetry-exporter-sender-okhttp-1.34.1.jar + - io.opentelemetry-opentelemetry-extension-incubator-1.34.1-alpha.jar + - io.opentelemetry-opentelemetry-sdk-1.34.1.jar + - io.opentelemetry-opentelemetry-sdk-common-1.34.1.jar + - io.opentelemetry-opentelemetry-sdk-extension-autoconfigure-1.34.1.jar + - io.opentelemetry-opentelemetry-sdk-extension-autoconfigure-spi-1.34.1.jar + - io.opentelemetry-opentelemetry-sdk-logs-1.34.1.jar + - io.opentelemetry-opentelemetry-sdk-metrics-1.34.1.jar + - io.opentelemetry-opentelemetry-sdk-trace-1.34.1.jar + - io.opentelemetry.instrumentation-opentelemetry-instrumentation-api-1.32.1.jar + - io.opentelemetry.instrumentation-opentelemetry-instrumentation-api-semconv-1.32.1-alpha.jar + - io.opentelemetry.instrumentation-opentelemetry-resources-1.32.1-alpha.jar + - io.opentelemetry.semconv-opentelemetry-semconv-1.23.1-alpha.jar BSD 3-clause "New" or "Revised" License * Google auth library diff --git a/pom.xml b/pom.xml index 4dfeb30821a..52a638ac09f 100644 --- a/pom.xml +++ b/pom.xml @@ -249,6 +249,10 @@ flexible messaging model and an intuitive client API.</description> <disruptor.version>3.4.3</disruptor.version> <zstd-jni.version>1.5.2-3</zstd-jni.version> <netty-reactive-streams.version>2.0.6</netty-reactive-streams.version> + <opentelemetry.version>1.34.1</opentelemetry.version> + <opentelemetry.alpha.version>1.34.1-alpha</opentelemetry.alpha.version> + <opentelemetry.instrumentation.version>1.32.1-alpha</opentelemetry.instrumentation.version> + <opentelemetry.semconv.version>1.23.1-alpha</opentelemetry.semconv.version> <!-- test dependencies --> <testcontainers.version>1.18.3</testcontainers.version> @@ -1446,6 +1450,31 @@ flexible messaging model and an intuitive client API.</description> <version>${restassured.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-bom</artifactId> + <version>${opentelemetry.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-bom-alpha</artifactId> + <version>${opentelemetry.alpha.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + <dependency> + <groupId>io.opentelemetry.instrumentation</groupId> + <artifactId>opentelemetry-resources</artifactId> + <version>${opentelemetry.instrumentation.version}</version> + </dependency> + <dependency> + <groupId>io.opentelemetry.semconv</groupId> + <artifactId>opentelemetry-semconv</artifactId> + <version>${opentelemetry.semconv.version}</version> + </dependency> </dependencies> </dependencyManagement> @@ -2266,6 +2295,7 @@ flexible messaging model and an intuitive client API.</description> <module>pulsar-broker-auth-sasl</module> <module>pulsar-client-auth-sasl</module> <module>pulsar-config-validation</module> + <module>pulsar-opentelemetry</module> <module>structured-event-log</module> @@ -2330,6 +2360,7 @@ flexible messaging model and an intuitive client API.</description> <module>pulsar-broker-auth-sasl</module> <module>pulsar-client-auth-sasl</module> <module>pulsar-config-validation</module> + <module>pulsar-opentelemetry</module> <!-- transaction related modules --> <module>pulsar-transaction</module> diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java index f6ad76a083b..a27384c9890 100644 --- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java +++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java @@ -242,6 +242,7 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase { proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + ".*"); proxyConfig.setSaslJaasServerSectionName("PulsarProxy"); + proxyConfig.setClusterName(configClusterName); // proxy connect to broker proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName()); diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml index d73dba288a3..8e942c78d5b 100644 --- a/pulsar-broker-common/pom.xml +++ b/pulsar-broker-common/pom.xml @@ -82,10 +82,28 @@ <artifactId>awaitility</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>io.rest-assured</groupId> + <artifactId>rest-assured</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> <groupId>org.gaul</groupId> <artifactId>modernizer-maven-plugin</artifactId> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsClient.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsClient.java similarity index 100% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsClient.java rename to pulsar-broker-common/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsClient.java diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index c39de184b05..18da38b43dc 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -143,6 +143,12 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-opentelemetry</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>${project.groupId}</groupId> <artifactId>pulsar-io-batch-discovery-triggerers</artifactId> @@ -209,6 +215,14 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-broker-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <!-- functions related dependencies (end) --> <dependency> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 054411c49f6..3701f354b62 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -108,6 +108,7 @@ import org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.service.schema.SchemaStorageFactory; import org.apache.pulsar.broker.stats.MetricsGenerator; +import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry; import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; import org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet; import org.apache.pulsar.broker.storage.ManagedLedgerStorage; @@ -248,6 +249,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private final Timer brokerClientSharedTimer; private MetricsGenerator metricsGenerator; + private PulsarBrokerOpenTelemetry openTelemetry; private TransactionMetadataStoreService transactionMetadataStoreService; private TransactionBufferProvider transactionBufferProvider; @@ -461,6 +463,9 @@ public class PulsarService implements AutoCloseable, ShutdownService { } resetMetricsServlet(); + if (openTelemetry != null) { + openTelemetry.close(); + } if (this.compactionServiceFactory != null) { try { @@ -897,6 +902,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { } this.metricsGenerator = new MetricsGenerator(this); + this.openTelemetry = new PulsarBrokerOpenTelemetry(config); // Initialize the message protocol handlers. // start the protocol handlers only after the broker is ready, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java new file mode 100644 index 00000000000..4b76b993001 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import io.opentelemetry.api.metrics.Meter; +import java.io.Closeable; +import lombok.Getter; +import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.opentelemetry.OpenTelemetryService; + +public class PulsarBrokerOpenTelemetry implements Closeable { + + public static final String SERVICE_NAME = "pulsar-broker"; + private final OpenTelemetryService openTelemetryService; + + @Getter + private final Meter meter; + + public PulsarBrokerOpenTelemetry(ServiceConfiguration config) { + openTelemetryService = OpenTelemetryService.builder() + .clusterName(config.getClusterName()) + .serviceName(SERVICE_NAME) + .serviceVersion(PulsarVersion.getVersion()) + .build(); + meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.broker"); + } + + @Override + public void close() { + openTelemetryService.close(); + } +} diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml index cd89bacbf9e..bb93eeb98d7 100644 --- a/pulsar-functions/worker/pom.xml +++ b/pulsar-functions/worker/pom.xml @@ -46,6 +46,12 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-opentelemetry</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>${project.groupId}</groupId> <artifactId>pulsar-client-original</artifactId> diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java new file mode 100644 index 00000000000..be7c15dfd85 --- /dev/null +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import io.opentelemetry.api.metrics.Meter; +import java.io.Closeable; +import lombok.Getter; +import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.opentelemetry.OpenTelemetryService; + +public class PulsarWorkerOpenTelemetry implements Closeable { + + public static final String SERVICE_NAME = "pulsar-function-worker"; + private final OpenTelemetryService openTelemetryService; + + @Getter + private final Meter meter; + + public PulsarWorkerOpenTelemetry(WorkerConfig workerConfig) { + openTelemetryService = OpenTelemetryService.builder() + .clusterName(workerConfig.getPulsarFunctionsCluster()) + .serviceName(SERVICE_NAME) + .serviceVersion(PulsarVersion.getVersion()) + .build(); + meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.function_worker"); + } + + @Override + public void close() { + openTelemetryService.close(); + } +} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java index 16cf778e072..9f7d1996e0b 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java @@ -108,6 +108,7 @@ public class PulsarWorkerService implements WorkerService { private PulsarAdmin brokerAdmin; private PulsarAdmin functionAdmin; private MetricsGenerator metricsGenerator; + private PulsarWorkerOpenTelemetry openTelemetry; @VisibleForTesting private URI dlogUri; private LeaderService leaderService; @@ -188,6 +189,7 @@ public class PulsarWorkerService implements WorkerService { this.statsUpdater = Executors .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-stats-updater")); this.metricsGenerator = new MetricsGenerator(this.statsUpdater, workerConfig); + this.openTelemetry = new PulsarWorkerOpenTelemetry(workerConfig); this.workerConfig = workerConfig; this.dlogUri = dlogUri; this.workerStatsManager = new WorkerStatsManager(workerConfig, runAsStandalone); @@ -659,6 +661,10 @@ public class PulsarWorkerService implements WorkerService { if (null != stateStoreProvider) { stateStoreProvider.close(); } + + if (null != openTelemetry) { + openTelemetry.close(); + } } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java index 022ebd6ba48..c78c68f8923 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java @@ -60,6 +60,8 @@ import org.testng.annotations.Test; @Slf4j public class FunctionAssignmentTailerTest { + private static final String CLUSTER_NAME = "test-cluster"; + @Test(timeOut = 10000) public void testErrorNotifier() throws Exception { WorkerConfig workerConfig = new WorkerConfig(); @@ -71,6 +73,7 @@ public class FunctionAssignmentTailerTest { workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); workerConfig.setStateStorageServiceUrl("foo"); workerConfig.setFunctionAssignmentTopicName("assignments"); + workerConfig.setPulsarFunctionsCluster(CLUSTER_NAME); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( Function.FunctionDetails.newBuilder() @@ -183,6 +186,7 @@ public class FunctionAssignmentTailerTest { workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); workerConfig.setStateStorageServiceUrl("foo"); workerConfig.setFunctionAssignmentTopicName("assignments"); + workerConfig.setPulsarFunctionsCluster(CLUSTER_NAME); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( Function.FunctionDetails.newBuilder() @@ -307,6 +311,7 @@ public class FunctionAssignmentTailerTest { workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); workerConfig.setStateStorageServiceUrl("foo"); workerConfig.setFunctionAssignmentTopicName("assignments"); + workerConfig.setPulsarFunctionsCluster(CLUSTER_NAME); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( Function.FunctionDetails.newBuilder() diff --git a/pulsar-broker-common/pom.xml b/pulsar-opentelemetry/pom.xml similarity index 59% copy from pulsar-broker-common/pom.xml copy to pulsar-opentelemetry/pom.xml index d73dba288a3..82a9658cc9d 100644 --- a/pulsar-broker-common/pom.xml +++ b/pulsar-opentelemetry/pom.xml @@ -20,8 +20,8 @@ --> <project - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" - xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.pulsar</groupId> @@ -29,51 +29,58 @@ <version>3.3.0-SNAPSHOT</version> </parent> - <artifactId>pulsar-broker-common</artifactId> - <description>Common classes used in multiple broker modules</description> + <artifactId>pulsar-opentelemetry</artifactId> + <description>OpenTelemetry Integration</description> <dependencies> + <!-- OpenTelemetry dependencies --> <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>pulsar-metadata</artifactId> - <version>${project.version}</version> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-exporter-otlp</artifactId> </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-exporter-prometheus</artifactId> </dependency> - <dependency> - <groupId>io.prometheus</groupId> - <artifactId>simpleclient_jetty</artifactId> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk</artifactId> </dependency> - <dependency> - <groupId>javax.servlet</groupId> - <artifactId>javax.servlet-api</artifactId> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId> </dependency> - <dependency> - <groupId>javax.ws.rs</groupId> - <artifactId>javax.ws.rs-api</artifactId> + <groupId>io.opentelemetry.instrumentation</groupId> + <artifactId>opentelemetry-resources</artifactId> + </dependency> + <dependency> + <groupId>io.opentelemetry.semconv</groupId> + <artifactId>opentelemetry-semconv</artifactId> </dependency> <dependency> - <groupId>io.jsonwebtoken</groupId> - <artifactId>jjwt-impl</artifactId> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> </dependency> <dependency> - <groupId>io.jsonwebtoken</groupId> - <artifactId>jjwt-jackson</artifactId> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> </dependency> <!-- test --> <dependency> - <groupId>org.bouncycastle</groupId> - <artifactId>bc-fips</artifactId> - <version>${bouncycastle.bc-fips.version}</version> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-broker-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>io.rest-assured</groupId> + <artifactId>rest-assured</artifactId> <scope>test</scope> </dependency> @@ -82,6 +89,12 @@ <artifactId>awaitility</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk-testing</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -117,29 +130,6 @@ </execution> </executions> </plugin> - - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <executions> - <execution> - <id>copy-resources</id> - <phase>test-compile</phase> - <goals> - <goal>copy-resources</goal> - </goals> - <configuration> - <outputDirectory>${project.build.testOutputDirectory}/certificate-authority</outputDirectory> - <overwrite>true</overwrite> - <resources> - <resource> - <directory>${project.parent.basedir}/tests/certificate-authority</directory> - <filtering>false</filtering> - </resource> - </resources> - </configuration> - </execution> - </executions> - </plugin> </plugins> </build> </project> diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java new file mode 100644 index 00000000000..bdb002cb359 --- /dev/null +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.opentelemetry; + +import io.opentelemetry.api.common.AttributeKey; + +/** + * Common OpenTelemetry attributes to be used by Pulsar components. + */ +public interface OpenTelemetryAttributes { + /** + * The name of the Pulsar cluster. This attribute is automatically added to all signals by + * {@link OpenTelemetryService}. + */ + AttributeKey<String> PULSAR_CLUSTER = AttributeKey.stringKey("pulsar.cluster"); +} diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java new file mode 100644 index 00000000000..5ead1ff265c --- /dev/null +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.opentelemetry; + +import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.annotations.VisibleForTesting; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.semconv.ResourceAttributes; +import java.io.Closeable; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; +import lombok.Builder; +import org.apache.commons.lang3.StringUtils; + +/** + * Provides a common OpenTelemetry service for Pulsar components to use. Responsible for instantiating the OpenTelemetry + * SDK with a set of override properties. Once initialized, furnishes access to OpenTelemetry. + */ +public class OpenTelemetryService implements Closeable { + + static final String OTEL_SDK_DISABLED_KEY = "otel.sdk.disabled"; + static final int MAX_CARDINALITY_LIMIT = 10000; + + private final OpenTelemetrySdk openTelemetrySdk; + + /** + * Instantiates the OpenTelemetry SDK. All attributes are overridden by system properties or environment + * variables. + * + * @param clusterName + * The name of the Pulsar cluster. Cannot be null or blank. + * @param serviceName + * The name of the service. Optional. + * @param serviceVersion + * The version of the service. Optional. + * @param sdkBuilderConsumer + * Allows customizing the SDK builder; for testing purposes only. + */ + @Builder + public OpenTelemetryService(String clusterName, + String serviceName, + String serviceVersion, + @VisibleForTesting Consumer<AutoConfiguredOpenTelemetrySdkBuilder> sdkBuilderConsumer) { + checkArgument(StringUtils.isNotBlank(clusterName), "Cluster name cannot be empty"); + var sdkBuilder = AutoConfiguredOpenTelemetrySdk.builder(); + + sdkBuilder.addPropertiesSupplier(() -> Map.of( + OTEL_SDK_DISABLED_KEY, "true", + // Cardinality limit includes the overflow attribute set, so we need to add 1. + "otel.experimental.metrics.cardinality.limit", Integer.toString(MAX_CARDINALITY_LIMIT + 1) + )); + + sdkBuilder.addResourceCustomizer( + (resource, __) -> { + var resourceBuilder = Resource.builder(); + // Do not override attributes if already set (via system properties or environment variables). + if (resource.getAttribute(OpenTelemetryAttributes.PULSAR_CLUSTER) == null) { + resourceBuilder.put(OpenTelemetryAttributes.PULSAR_CLUSTER, clusterName); + } + if (StringUtils.isNotBlank(serviceName) + && Objects.equals(Resource.getDefault().getAttribute(ResourceAttributes.SERVICE_NAME), + resource.getAttribute(ResourceAttributes.SERVICE_NAME))) { + resourceBuilder.put(ResourceAttributes.SERVICE_NAME, serviceName); + } + if (StringUtils.isNotBlank(serviceVersion) + && resource.getAttribute(ResourceAttributes.SERVICE_VERSION) == null) { + resourceBuilder.put(ResourceAttributes.SERVICE_VERSION, serviceVersion); + } + return resource.merge(resourceBuilder.build()); + }); + + if (sdkBuilderConsumer != null) { + sdkBuilderConsumer.accept(sdkBuilder); + } + + openTelemetrySdk = sdkBuilder.build().getOpenTelemetrySdk(); + } + + public OpenTelemetry getOpenTelemetry() { + return openTelemetrySdk; + } + + @Override + public void close() { + openTelemetrySdk.close(); + } +} diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/package-info.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/package-info.java new file mode 100644 index 00000000000..9a7426aa047 --- /dev/null +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Provides a wrapper layer for the OpenTelemetry API to be used in Pulsar. + * @since 3.3.0 + */ +package org.apache.pulsar.opentelemetry; \ No newline at end of file diff --git a/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java b/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java new file mode 100644 index 00000000000..e5c893794a0 --- /dev/null +++ b/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.opentelemetry; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounterBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.instrumentation.resources.JarServiceNameDetector; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.metrics.export.MetricReader; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import io.opentelemetry.semconv.ResourceAttributes; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import lombok.Cleanup; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient; +import org.assertj.core.api.AbstractCharSequenceAssert; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class OpenTelemetryServiceTest { + + private OpenTelemetryService openTelemetryService; + private InMemoryMetricReader reader; + private Meter meter; + + @BeforeMethod + public void setup() throws Exception { + reader = InMemoryMetricReader.create(); + openTelemetryService = OpenTelemetryService.builder(). + sdkBuilderConsumer(getSdkBuilderConsumer(reader, + Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false"))). + clusterName("openTelemetryServiceTestCluster"). + build(); + meter = openTelemetryService.getOpenTelemetry().getMeter("openTelemetryServiceTestInstrument"); + } + + @AfterMethod + public void teardown() throws Exception { + openTelemetryService.close(); + reader.close(); + } + + // Customizes the SDK builder to include the MetricReader and extra properties for testing purposes. + private static Consumer<AutoConfiguredOpenTelemetrySdkBuilder> getSdkBuilderConsumer(MetricReader extraReader, + Map<String, String> extraProperties) { + return autoConfigurationCustomizer -> { + if (extraReader != null) { + autoConfigurationCustomizer.addMeterProviderCustomizer( + (sdkMeterProviderBuilder, __) -> sdkMeterProviderBuilder.registerMetricReader(extraReader)); + } + autoConfigurationCustomizer.addPropertiesSupplier(() -> extraProperties); + }; + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testClusterNameCannotBeNull() { + @Cleanup + var ots = OpenTelemetryService.builder().build(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testClusterNameCannotBeEmpty() { + @Cleanup + var ots = OpenTelemetryService.builder().clusterName(StringUtils.EMPTY).build(); + } + + @Test + public void testResourceAttributesAreSet() throws Exception { + @Cleanup + var reader = InMemoryMetricReader.create(); + + @Cleanup + var ots = OpenTelemetryService.builder(). + sdkBuilderConsumer(getSdkBuilderConsumer(reader, + Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false", + "otel.java.disabled.resource.providers", JarServiceNameDetector.class.getName()))). + clusterName("testServiceNameAndVersion"). + serviceName("openTelemetryServiceTestService"). + serviceVersion("1.0.0"). + build(); + + assertThat(reader.collectAllMetrics()) + .allSatisfy(metric -> assertThat(metric) + .hasResourceSatisfying(resource -> resource + .hasAttribute(OpenTelemetryAttributes.PULSAR_CLUSTER, "testServiceNameAndVersion") + .hasAttribute(ResourceAttributes.SERVICE_NAME, "openTelemetryServiceTestService") + .hasAttribute(ResourceAttributes.SERVICE_VERSION, "1.0.0") + .hasAttribute(satisfies(ResourceAttributes.HOST_NAME, AbstractCharSequenceAssert::isNotBlank)))); + } + + @Test + public void testIsInstrumentationNameSetOnMeter() { + var meter = openTelemetryService.getOpenTelemetry().getMeter("testInstrumentationScope"); + meter.counterBuilder("dummyCounter").build().add(1); + assertThat(reader.collectAllMetrics()) + .anySatisfy(metricData -> assertThat(metricData) + .hasInstrumentationScope(InstrumentationScopeInfo.create("testInstrumentationScope"))); + } + + @Test + public void testMetricCardinalityIsSet() { + var prometheusExporterPort = 9464; + @Cleanup + var ots = OpenTelemetryService.builder(). + sdkBuilderConsumer(getSdkBuilderConsumer(null, + Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false", + "otel.metrics.exporter", "prometheus", + "otel.exporter.prometheus.port", Integer.toString(prometheusExporterPort)))). + clusterName("openTelemetryServiceCardinalityTestCluster"). + build(); + var meter = ots.getOpenTelemetry().getMeter("openTelemetryMetricCardinalityTest"); + var counter = meter.counterBuilder("dummyCounter").build(); + for (int i = 0; i < OpenTelemetryService.MAX_CARDINALITY_LIMIT + 100; i++) { + counter.add(1, Attributes.of(AttributeKey.stringKey("attribute"), "value" + i)); + } + + Awaitility.waitAtMost(30, TimeUnit.SECONDS).ignoreExceptions().until(() -> { + var client = new PrometheusMetricsClient("localhost", prometheusExporterPort); + var allMetrics = client.getMetrics(); + var actualMetrics = allMetrics.findByNameAndLabels("dummyCounter_total"); + var overflowMetric = allMetrics.findByNameAndLabels("dummyCounter_total", "otel_metric_overflow", "true"); + return actualMetrics.size() == OpenTelemetryService.MAX_CARDINALITY_LIMIT + 1 && overflowMetric.size() == 1; + }); + } + + @Test + public void testLongCounter() { + var longCounter = meter.counterBuilder("dummyLongCounter").build(); + var attributes = Attributes.of(AttributeKey.stringKey("dummyAttr"), "dummyValue"); + longCounter.add(1, attributes); + longCounter.add(2, attributes); + + assertThat(reader.collectAllMetrics()) + .anySatisfy(metric -> assertThat(metric) + .hasName("dummyLongCounter") + .hasLongSumSatisfying(sum -> sum + .hasPointsSatisfying(point -> point + .hasAttributes(attributes) + .hasValue(3)))); + } + + @Test + public void testServiceIsDisabledByDefault() throws Exception { + @Cleanup + var metricReader = InMemoryMetricReader.create(); + + @Cleanup + var ots = OpenTelemetryService.builder(). + sdkBuilderConsumer(getSdkBuilderConsumer(metricReader, Map.of())). + clusterName("openTelemetryServiceTestCluster"). + build(); + var meter = ots.getOpenTelemetry().getMeter("openTelemetryServiceTestInstrument"); + + var builders = List.of( + meter.counterBuilder("dummyCounterA"), + meter.counterBuilder("dummyCounterB").setDescription("desc"), + meter.counterBuilder("dummyCounterC").setDescription("desc").setUnit("unit"), + meter.counterBuilder("dummyCounterD").setUnit("unit") + ); + + var callback = new AtomicBoolean(); + // Validate that no matter how the counters are being built, they are all backed by the same underlying object. + // This ensures we conserve memory when the SDK is disabled. + assertThat(builders.stream().map(LongCounterBuilder::build).distinct()).hasSize(1); + assertThat(builders.stream().map(LongCounterBuilder::buildObserver).distinct()).hasSize(1); + assertThat(builders.stream().map(b -> b.buildWithCallback(__ -> callback.set(true))).distinct()).hasSize(1); + + // Validate that no metrics are being emitted at all. + assertThat(metricReader.collectAllMetrics()).isEmpty(); + + // Validate that the callback has not being called. + assertThat(callback).isFalse(); + } +} diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml index 8fb1313f9ce..55dfd11e40e 100644 --- a/pulsar-proxy/pom.xml +++ b/pulsar-proxy/pom.xml @@ -49,6 +49,12 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-opentelemetry</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>${project.groupId}</groupId> <artifactId>pulsar-docs-tools</artifactId> diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 719c7c2cbda..61b00871cec 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -73,6 +73,7 @@ import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.proxy.extensions.ProxyExtensions; +import org.apache.pulsar.proxy.stats.PulsarProxyOpenTelemetry; import org.apache.pulsar.proxy.stats.TopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,6 +149,8 @@ public class ProxyService implements Closeable { private PrometheusMetricsServlet metricsServlet; private List<PrometheusRawMetricsProvider> pendingMetricsProviders; + @Getter + private PulsarProxyOpenTelemetry openTelemetry; @Getter private final ConnectionController connectionController; @@ -284,6 +287,7 @@ public class ProxyService implements Closeable { } createMetricsServlet(); + openTelemetry = new PulsarProxyOpenTelemetry(proxyConfig); // Initialize the message protocol handlers. // start the protocol handlers only after the broker is ready, @@ -399,6 +403,9 @@ public class ProxyService implements Closeable { proxyAdditionalServlets = null; } + if (openTelemetry != null) { + openTelemetry.close(); + } resetMetricsServlet(); if (localMetadataStore != null) { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java new file mode 100644 index 00000000000..14bbc649466 --- /dev/null +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.stats; + +import io.opentelemetry.api.metrics.Meter; +import java.io.Closeable; +import lombok.Getter; +import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.opentelemetry.OpenTelemetryService; +import org.apache.pulsar.proxy.server.ProxyConfiguration; + +public class PulsarProxyOpenTelemetry implements Closeable { + + public static final String SERVICE_NAME = "pulsar-proxy"; + private final OpenTelemetryService openTelemetryService; + + @Getter + private final Meter meter; + + public PulsarProxyOpenTelemetry(ProxyConfiguration config) { + openTelemetryService = OpenTelemetryService.builder() + .clusterName(config.getClusterName()) + .serviceName(SERVICE_NAME) + .serviceVersion(PulsarVersion.getVersion()) + .build(); + meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.proxy"); + } + + @Override + public void close() { + openTelemetryService.close(); + } +} diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java index fde7c938d0a..f9ace716ecd 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java @@ -140,6 +140,7 @@ public abstract class SimpleProxyExtensionTestBase extends MockedPulsarServiceBa proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + proxyConfig.setClusterName(configClusterName); proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java index bc2029861f4..92c644b470d 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java @@ -101,6 +101,7 @@ public class AdminProxyHandlerKeystoreTLSTest extends MockedPulsarServiceBaseTes proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationKeyStoreTls.class.getName()); proxyConfig.setBrokerClientAuthenticationParameters(String.format("keyStoreType:%s,keyStorePath:%s,keyStorePassword:%s", KEYSTORE_TYPE, BROKER_KEYSTORE_FILE_PATH, BROKER_KEYSTORE_PW)); + proxyConfig.setClusterName(configClusterName); resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)), registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java index d83de9652cf..ef58648e35a 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java @@ -85,6 +85,7 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest { proxyConfig.setWebServicePortTls(Optional.of(0)); proxyConfig.setTlsEnabledWithBroker(true); proxyConfig.setHttpMaxRequestHeaderSize(20000); + proxyConfig.setClusterName(configClusterName); // enable tls and auth&auth at proxy proxyConfig.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java index 9f8efa1ec79..f61a73bbf91 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java @@ -78,6 +78,7 @@ public class ProxyAdditionalServletTest extends MockedPulsarServiceBaseTest { proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); // enable full parsing feature proxyConfig.setProxyLogLevel(Optional.of(2)); + proxyConfig.setClusterName(configClusterName); // this is for nar package test // addServletNar(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java index 1c93cb20c70..4083c984d98 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java @@ -137,6 +137,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + proxyConfig.setClusterName(configClusterName); proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java index fec0673ff9b..662b8305c0e 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java @@ -58,6 +58,7 @@ import org.testng.annotations.Test; public class ProxyAuthenticationTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(ProxyAuthenticationTest.class); + private static final String CLUSTER_NAME = "test"; public static class BasicAuthenticationData implements AuthenticationDataProvider { private final String authParam; @@ -178,7 +179,7 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase { providers.add(BasicAuthenticationProvider.class.getName()); conf.setAuthenticationProviders(providers); - conf.setClusterName("test"); + conf.setClusterName(CLUSTER_NAME); Set<String> proxyRoles = new HashSet<>(); proxyRoles.add("proxy"); conf.setProxyRoles(proxyRoles); @@ -222,6 +223,7 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase { proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); + proxyConfig.setClusterName(CLUSTER_NAME); proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName()); proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java index a070d1e84d3..78ab9bd0d95 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java @@ -59,6 +59,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest { proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP); proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION); proxyConfig.setMaxConcurrentInboundConnectionsPerIp(NUM_CONCURRENT_INBOUND_CONNECTION); + proxyConfig.setClusterName(configClusterName); proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java index 5704ba55fed..413774daf2c 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java @@ -60,6 +60,7 @@ public class ProxyEnableHAProxyProtocolTest extends MockedPulsarServiceBaseTest proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); proxyConfig.setHaProxyProtocolEnabled(true); + proxyConfig.setClusterName(configClusterName); proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java index 477fe597f26..5e969ca26e4 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java @@ -46,6 +46,7 @@ import org.testng.annotations.Test; public class ProxyForwardAuthDataTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(ProxyForwardAuthDataTest.class); + private static final String CLUSTER_NAME = "test"; @BeforeMethod @Override @@ -64,7 +65,7 @@ public class ProxyForwardAuthDataTest extends ProducerConsumerBase { providers.add(BasicAuthenticationProvider.class.getName()); conf.setAuthenticationProviders(providers); - conf.setClusterName("test"); + conf.setClusterName(CLUSTER_NAME); Set<String> proxyRoles = new HashSet<String>(); proxyRoles.add("proxy"); conf.setProxyRoles(proxyRoles); @@ -109,6 +110,7 @@ public class ProxyForwardAuthDataTest extends ProducerConsumerBase { proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName()); proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams); + proxyConfig.setClusterName(CLUSTER_NAME); Set<String> providers = new HashSet<>(); providers.add(BasicAuthenticationProvider.class.getName()); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java index 5ee03395b80..5671c527f68 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java @@ -77,6 +77,7 @@ public class ProxyKeyStoreTlsTransportTest extends MockedPulsarServiceBaseTest { proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + proxyConfig.setClusterName(configClusterName); proxyConfig.setTlsRequireTrustedClientCertOnConnect(false); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java index 1f21281a6f6..99fb8c03a81 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java @@ -77,6 +77,7 @@ public class ProxyKeyStoreTlsWithAuthTest extends MockedPulsarServiceBaseTest { proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + proxyConfig.setClusterName(configClusterName); // config for authentication and authorization. proxyConfig.setTlsRequireTrustedClientCertOnConnect(true); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java index d7935755ce0..1dcebda7935 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java @@ -74,6 +74,7 @@ public class ProxyKeyStoreTlsWithoutAuthTest extends MockedPulsarServiceBaseTest proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + proxyConfig.setClusterName(configClusterName); proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java index 167c3b19646..a9017404d0e 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java @@ -65,6 +65,7 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest { proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP); proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION); + proxyConfig.setClusterName(configClusterName); AuthenticationService authenticationService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java index 08066f2e5bf..fae44c00ada 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java @@ -66,6 +66,7 @@ public class ProxyMutualTlsTest extends MockedPulsarServiceBaseTest { proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); proxyConfig.setTlsRequireTrustedClientCertOnConnect(true); proxyConfig.setTlsAllowInsecureConnection(false); + proxyConfig.setClusterName(configClusterName); proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java index 0d93185f5e8..3f58250e6d6 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java @@ -71,6 +71,7 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest { proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + proxyConfig.setClusterName(configClusterName); //enable full parsing feature proxyConfig.setProxyLogLevel(Optional.ofNullable(2)); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java index 6beed27cb66..d06cf4201ff 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java @@ -52,6 +52,7 @@ import org.testng.annotations.Test; @Slf4j public class ProxyRefreshAuthTest extends ProducerConsumerBase { + private static final String CLUSTER_NAME = "proxy-authorization"; private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); private ProxyService proxyService; @@ -84,7 +85,7 @@ public class ProxyRefreshAuthTest extends ProducerConsumerBase { properties.setProperty("tokenAllowedClockSkewSeconds", "2"); conf.setProperties(properties); - conf.setClusterName("proxy-authorization"); + conf.setClusterName(CLUSTER_NAME); conf.setNumExecutorThreadPoolSize(5); conf.setAuthenticationRefreshCheckSeconds(1); @@ -116,6 +117,7 @@ public class ProxyRefreshAuthTest extends ProducerConsumerBase { proxyConfig.setServicePort(Optional.of(0)); proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setWebServicePort(Optional.of(0)); + proxyConfig.setClusterName(CLUSTER_NAME); proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); proxyConfig.setBrokerClientAuthenticationParameters( diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java index 137ea829515..a1ffc13ee93 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java @@ -49,6 +49,7 @@ import org.testng.annotations.Test; public class ProxyRolesEnforcementTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(ProxyRolesEnforcementTest.class); + private static final String CLUSTER_NAME = "test"; public static class BasicAuthenticationData implements AuthenticationDataProvider { private final String authParam; @@ -154,7 +155,7 @@ public class ProxyRolesEnforcementTest extends ProducerConsumerBase { providers.add(BasicAuthenticationProvider.class.getName()); conf.setAuthenticationProviders(providers); - conf.setClusterName("test"); + conf.setClusterName(CLUSTER_NAME); Set<String> proxyRoles = new HashSet<>(); proxyRoles.add("proxy"); conf.setProxyRoles(proxyRoles); @@ -209,6 +210,7 @@ public class ProxyRolesEnforcementTest extends ProducerConsumerBase { proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); + proxyConfig.setClusterName(CLUSTER_NAME); proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName()); proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java index 0c9fa5c7ac3..3e598a57277 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java @@ -21,7 +21,7 @@ package org.apache.pulsar.proxy.server; import java.util.Optional; import org.testng.annotations.BeforeClass; -public class ProxyServiceStarterDisableZeroCopyTest extends ProxyServiceStarterTest{ +public class ProxyServiceStarterDisableZeroCopyTest extends ProxyServiceStarterTest { @Override @BeforeClass @@ -35,6 +35,7 @@ public class ProxyServiceStarterDisableZeroCopyTest extends ProxyServiceStarterT serviceStarter.getConfig().setWebSocketServiceEnabled(true); serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*"); serviceStarter.getConfig().setProxyZeroCopyModeEnabled(false); + serviceStarter.getConfig().setClusterName(configClusterName); serviceStarter.start(); serviceUrl = serviceStarter.getProxyService().getServiceUrl(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java index 71b1087ee64..f2632861253 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java @@ -61,6 +61,7 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest { serviceStarter.getConfig().setServicePort(Optional.of(0)); serviceStarter.getConfig().setWebSocketServiceEnabled(true); serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*"); + serviceStarter.getConfig().setClusterName(configClusterName); serviceStarter.start(); serviceUrl = serviceStarter.getProxyService().getServiceUrl(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java index b21162577a2..61718bbac3a 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java @@ -68,6 +68,7 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest { serviceStarter.getConfig().setTlsCertificateFilePath(PROXY_CERT_FILE_PATH); serviceStarter.getConfig().setTlsKeyFilePath(PROXY_KEY_FILE_PATH); serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*"); + serviceStarter.getConfig().setClusterName(configClusterName); serviceStarter.start(); serviceUrl = serviceStarter.getProxyService().getServiceUrlTls(); webPort = serviceStarter.getServer().getListenPortHTTP().get(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java index 155fbf616b0..2866c6c2690 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java @@ -72,6 +72,7 @@ public class ProxyStatsTest extends MockedPulsarServiceBaseTest { proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + proxyConfig.setClusterName(configClusterName); // enable full parsing feature proxyConfig.setProxyLogLevel(Optional.of(2)); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java index 79ea7c5d6a3..6e66008c15a 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java @@ -79,6 +79,7 @@ public class ProxyStuckConnectionTest extends MockedPulsarServiceBaseTest { proxyConfig.setServicePort(Optional.ofNullable(0)); proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); + proxyConfig.setClusterName(configClusterName); startProxyService(); // use the same port for subsequent restarts diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java index ac08052aaf1..9bc12dcc6fc 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java @@ -106,6 +106,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + proxyConfig.setClusterName(configClusterName); } @Override diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java index a1b27abece4..4e300d39741 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java @@ -61,6 +61,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest { proxyConfig.setTlsKeyFilePath(PROXY_KEY_FILE_PATH); proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + proxyConfig.setClusterName(configClusterName); proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java index ec5cace8a06..16f610d6d0a 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java @@ -73,6 +73,7 @@ public class ProxyTlsWithAuthTest extends MockedPulsarServiceBaseTest { " \"issuerUrl\":\"" + server.getIssuer() + "\"," + " \"audience\": \"an-audience\"," + " \"privateKey\":\"file://" + tempFile.getAbsolutePath() + "\"}"); + proxyConfig.setClusterName(configClusterName); proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java index e0dcefe2714..cf9ad5831ec 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java @@ -57,6 +57,7 @@ import org.testng.collections.Maps; public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(ProxyWithAuthorizationNegTest.class); + private static final String CLUSTER_NAME = "proxy-authorization-neg"; private final String TLS_PROXY_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithAuthorizationTest/proxy-cacert.pem"; private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithAuthorizationTest/proxy-cert.pem"; @@ -104,7 +105,7 @@ public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase { providers.add(AuthenticationProviderTls.class.getName()); conf.setAuthenticationProviders(providers); - conf.setClusterName("proxy-authorization-neg"); + conf.setClusterName(CLUSTER_NAME); conf.setNumExecutorThreadPoolSize(5); super.init(); @@ -121,6 +122,7 @@ public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase { proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setWebServicePortTls(Optional.of(0)); proxyConfig.setTlsEnabledWithBroker(true); + proxyConfig.setClusterName(CLUSTER_NAME); // enable tls and auth&auth at proxy proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java index 4e4c3c550cf..bc96c7ea510 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java @@ -64,6 +64,7 @@ import org.testng.collections.Maps; public class ProxyWithAuthorizationTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(ProxyWithAuthorizationTest.class); + private static final String CLUSTER_NAME = "proxy-authorization"; private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); private final String CLIENT_TOKEN = AuthTokenUtils.createToken(SECRET_KEY, "Client", Optional.empty()); @@ -189,7 +190,7 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase { properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY)); conf.setProperties(properties); - conf.setClusterName("proxy-authorization"); + conf.setClusterName(CLUSTER_NAME); conf.setNumExecutorThreadPoolSize(5); } @@ -206,6 +207,7 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase { proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls()); proxyConfig.setBrokerWebServiceURLTLS(pulsar.getWebServiceAddressTls()); proxyConfig.setAdvertisedAddress(null); + proxyConfig.setClusterName(CLUSTER_NAME); proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setServicePortTls(Optional.of(0)); @@ -432,6 +434,7 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase { proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls()); proxyConfig.setAdvertisedAddress(null); + proxyConfig.setClusterName(CLUSTER_NAME); proxyConfig.setServicePort(Optional.of(0)); proxyConfig.setBrokerProxyAllowedTargetPorts("*"); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java index f997532b273..d3c05fec721 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java @@ -110,6 +110,7 @@ public class ProxyWithExtensibleLoadManagerTest extends MultiBrokerBaseTest { proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + proxyConfig.setClusterName(configClusterName); return proxyConfig; } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java index 14be7dadc41..5fb3e046824 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java @@ -67,6 +67,7 @@ import org.testng.annotations.Test; public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(ProxyWithJwtAuthorizationTest.class); + private static final String CLUSTER_NAME = "proxy-authorization"; private final String ADMIN_ROLE = "admin"; private final String PROXY_ROLE = "proxy"; @@ -104,7 +105,7 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase { providers.add(AuthenticationProviderToken.class.getName()); conf.setAuthenticationProviders(providers); - conf.setClusterName("proxy-authorization"); + conf.setClusterName(CLUSTER_NAME); conf.setNumExecutorThreadPoolSize(5); super.init(); @@ -119,6 +120,7 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase { proxyConfig.setServicePort(Optional.of(0)); proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setWebServicePort(Optional.of(0)); + proxyConfig.setClusterName(CLUSTER_NAME); // enable auth&auth and use JWT at proxy proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java index e09194bb21d..9d9490e74b5 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java @@ -54,9 +54,11 @@ import org.testng.collections.Maps; public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(ProxyWithoutServiceDiscoveryTest.class); + private static final String CLUSTER_NAME = "without-service-discovery"; private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + @BeforeMethod @Override protected void setup() throws Exception { @@ -89,7 +91,7 @@ public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase { providers.add(AuthenticationProviderTls.class.getName()); conf.setAuthenticationProviders(providers); - conf.setClusterName("without-service-discovery"); + conf.setClusterName(CLUSTER_NAME); conf.setNumExecutorThreadPoolSize(5); super.init(); @@ -106,6 +108,7 @@ public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase { proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setWebServicePortTls(Optional.of(0)); proxyConfig.setTlsEnabledWithBroker(true); + proxyConfig.setClusterName(CLUSTER_NAME); // enable tls and auth&auth at proxy proxyConfig.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java index a44e2a85efa..57522186c8f 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java @@ -80,6 +80,7 @@ public class SuperUserAuthedAdminProxyHandlerTest extends MockedPulsarServiceBas proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setWebServicePortTls(Optional.of(0)); proxyConfig.setTlsEnabledWithBroker(true); + proxyConfig.setClusterName(configClusterName); // enable tls and auth&auth at proxy proxyConfig.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java index d239815ae81..fe8b1f45385 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java @@ -75,6 +75,7 @@ public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest { proxyConfig.setStatusFilePath(STATUS_FILE_PATH); proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + proxyConfig.setClusterName(configClusterName); webServer = new WebServer(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig))); diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml index df36c35a191..5582931851b 100644 --- a/tests/integration/pom.xml +++ b/tests/integration/pom.xml @@ -55,6 +55,13 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-broker-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-common</artifactId> @@ -73,6 +80,12 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-proxy</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>managed-ledger</artifactId> @@ -169,7 +182,6 @@ <scope>test</scope> </dependency> - <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> @@ -189,6 +201,12 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>io.rest-assured</groupId> + <artifactId>rest-assured</artifactId> + <scope>test</scope> + </dependency> + <!-- kinesis--> <dependency> <groupId>org.testcontainers</groupId> diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/OpenTelemetryCollectorContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/OpenTelemetryCollectorContainer.java new file mode 100644 index 00000000000..2b115ca6b95 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/OpenTelemetryCollectorContainer.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.containers; + +import java.time.Duration; +import org.apache.http.HttpStatus; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.utility.MountableFile; + +public class OpenTelemetryCollectorContainer extends ChaosContainer<OpenTelemetryCollectorContainer> { + + private static final String IMAGE_NAME = "otel/opentelemetry-collector-contrib:latest"; + private static final String NAME = "otel-collector"; + + public static final int PROMETHEUS_EXPORTER_PORT = 8889; + private static final int OTLP_RECEIVER_PORT = 4317; + private static final int ZPAGES_PORT = 55679; + + public OpenTelemetryCollectorContainer(String clusterName) { + super(clusterName, IMAGE_NAME); + } + + @Override + protected void configure() { + super.configure(); + + this.withCopyFileToContainer( + MountableFile.forClasspathResource("containers/otel-collector-config.yaml", 0644), + "/etc/otel-collector-config.yaml") + .withCommand("--config=/etc/otel-collector-config.yaml") + .withExposedPorts(OTLP_RECEIVER_PORT, PROMETHEUS_EXPORTER_PORT, ZPAGES_PORT) + .waitingFor(new HttpWaitStrategy() + .forPath("/debug/servicez") + .forPort(ZPAGES_PORT) + .forStatusCode(HttpStatus.SC_OK) + .withStartupTimeout(Duration.ofSeconds(300))); + } + + @Override + public String getContainerName() { + return clusterName + "-" + NAME; + } + + public String getOtlpEndpoint() { + return String.format("http://%s:%d", NAME, OTLP_RECEIVER_PORT); + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java index 56d64ce5b2c..77cdc1bfd28 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java @@ -26,6 +26,7 @@ import java.io.UncheckedIOException; import java.time.Duration; import java.util.Objects; import java.util.UUID; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.pulsar.tests.integration.docker.ContainerExecResult; @@ -70,6 +71,7 @@ public abstract class PulsarContainer<SelfT extends PulsarContainer<SelfT>> exte public static final boolean PULSAR_CONTAINERS_LEAVE_RUNNING = Boolean.parseBoolean(System.getenv("PULSAR_CONTAINERS_LEAVE_RUNNING")); + @Getter protected final String hostname; private final String serviceName; private final String serviceEntryPoint; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java new file mode 100644 index 00000000000..38afc1f127d --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.metrics; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient; +import org.apache.pulsar.functions.worker.PulsarWorkerOpenTelemetry; +import org.apache.pulsar.proxy.stats.PulsarProxyOpenTelemetry; +import org.apache.pulsar.tests.integration.containers.ChaosContainer; +import org.apache.pulsar.tests.integration.containers.OpenTelemetryCollectorContainer; +import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.apache.pulsar.tests.integration.topologies.PulsarTestBase; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +public class OpenTelemetrySanityTest { + + // Validate that the OpenTelemetry metrics can be exported to a remote OpenTelemetry collector. + @Test(timeOut = 360_000) + public void testOpenTelemetryMetricsOtlpExport() throws Exception { + var clusterName = "testOpenTelemetryMetrics-" + UUID.randomUUID(); + var openTelemetryCollectorContainer = new OpenTelemetryCollectorContainer(clusterName); + + var exporter = "otlp"; + var otlpEndpointProp = + Pair.of("OTEL_EXPORTER_OTLP_ENDPOINT", openTelemetryCollectorContainer.getOtlpEndpoint()); + + var brokerCollectorProps = getOpenTelemetryProps(exporter, otlpEndpointProp); + var proxyCollectorProps = getOpenTelemetryProps(exporter, otlpEndpointProp); + var functionWorkerCollectorProps = getOpenTelemetryProps(exporter, otlpEndpointProp); + + var spec = PulsarClusterSpec.builder() + .clusterName(clusterName) + .brokerEnvs(brokerCollectorProps) + .proxyEnvs(proxyCollectorProps) + .externalService("otel-collector", openTelemetryCollectorContainer) + .functionWorkerEnvs(functionWorkerCollectorProps) + .build(); + @Cleanup("stop") + var pulsarCluster = PulsarCluster.forSpec(spec); + pulsarCluster.start(); + pulsarCluster.setupFunctionWorkers(PulsarTestBase.randomName(), FunctionRuntimeType.PROCESS, 1); + + // TODO: Validate cluster name and service version are present once + // https://github.com/open-telemetry/opentelemetry-java/issues/6108 is solved. + var metricName = "queueSize_ratio"; // Sent automatically by the OpenTelemetry SDK. + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> { + var metrics = getMetricsFromPrometheus( + openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT); + return !metrics.findByNameAndLabels(metricName, "job", PulsarBrokerOpenTelemetry.SERVICE_NAME).isEmpty(); + }); + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> { + var metrics = getMetricsFromPrometheus( + openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT); + return !metrics.findByNameAndLabels(metricName, "job", PulsarProxyOpenTelemetry.SERVICE_NAME).isEmpty(); + }); + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> { + var metrics = getMetricsFromPrometheus( + openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT); + return !metrics.findByNameAndLabels(metricName, "job", PulsarWorkerOpenTelemetry.SERVICE_NAME).isEmpty(); + }); + } + + /* + * Validate that the OpenTelemetry metrics can be exported to a local Prometheus endpoint running in the same + * process space as the broker/proxy/function-worker. + * https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk-extensions/autoconfigure/README.md#prometheus-exporter + */ + @Test(timeOut = 360_000) + public void testOpenTelemetryMetricsPrometheusExport() throws Exception { + var prometheusExporterPort = 9464; + var clusterName = "testOpenTelemetryMetrics-" + UUID.randomUUID(); + + var exporter = "prometheus"; + var prometheusExporterPortProp = + Pair.of("OTEL_EXPORTER_PROMETHEUS_PORT", Integer.toString(prometheusExporterPort)); + + var brokerCollectorProps = getOpenTelemetryProps(exporter, prometheusExporterPortProp); + var proxyCollectorProps = getOpenTelemetryProps(exporter, prometheusExporterPortProp); + var functionWorkerCollectorProps = getOpenTelemetryProps(exporter, prometheusExporterPortProp); + + var spec = PulsarClusterSpec.builder() + .clusterName(clusterName) + .brokerEnvs(brokerCollectorProps) + .brokerAdditionalPorts(List.of(prometheusExporterPort)) + .proxyEnvs(proxyCollectorProps) + .proxyAdditionalPorts(List.of(prometheusExporterPort)) + .functionWorkerEnvs(functionWorkerCollectorProps) + .functionWorkerAdditionalPorts(List.of(prometheusExporterPort)) + .build(); + @Cleanup("stop") + var pulsarCluster = PulsarCluster.forSpec(spec); + pulsarCluster.start(); + pulsarCluster.setupFunctionWorkers(PulsarTestBase.randomName(), FunctionRuntimeType.PROCESS, 1); + + var metricName = "target_info"; // Sent automatically by the OpenTelemetry SDK. + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> { + var metrics = getMetricsFromPrometheus(pulsarCluster.getBroker(0), prometheusExporterPort); + return !metrics.findByNameAndLabels(metricName, + Pair.of("pulsar_cluster", clusterName), + Pair.of("service_name", PulsarBrokerOpenTelemetry.SERVICE_NAME), + Pair.of("service_version", PulsarVersion.getVersion()), + Pair.of("host_name", pulsarCluster.getBroker(0).getHostname())).isEmpty(); + }); + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> { + var metrics = getMetricsFromPrometheus(pulsarCluster.getProxy(), prometheusExporterPort); + return !metrics.findByNameAndLabels(metricName, + Pair.of("pulsar_cluster", clusterName), + Pair.of("service_name", PulsarProxyOpenTelemetry.SERVICE_NAME), + Pair.of("service_version", PulsarVersion.getVersion()), + Pair.of("host_name", pulsarCluster.getProxy().getHostname())).isEmpty(); + }); + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> { + var metrics = getMetricsFromPrometheus(pulsarCluster.getAnyWorker(), prometheusExporterPort); + return !metrics.findByNameAndLabels(metricName, + Pair.of("pulsar_cluster", clusterName), + Pair.of("service_name", PulsarWorkerOpenTelemetry.SERVICE_NAME), + Pair.of("service_version", PulsarVersion.getVersion()), + Pair.of("host_name", pulsarCluster.getAnyWorker().getHostname())).isEmpty(); + }); + } + + private static PrometheusMetricsClient.Metrics getMetricsFromPrometheus(ChaosContainer<?> container, int port) { + var client = new PrometheusMetricsClient(container.getHost(), container.getMappedPort(port)); + return client.getMetrics(); + } + + private static Map<String, String> getOpenTelemetryProps(String exporter, Pair<String, String> ... extraProps) { + var defaultProps = Map.of( + "OTEL_SDK_DISABLED", "false", + "OTEL_METRIC_EXPORT_INTERVAL", "1000", + "OTEL_METRICS_EXPORTER", exporter + ); + var props = new HashMap<>(defaultProps); + Arrays.stream(extraProps).forEach(p -> props.put(p.getKey(), p.getValue())); + return props; + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index bc9b1e267b9..5f893f67f74 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -102,6 +102,8 @@ public class PulsarCluster { private final ProxyContainer proxyContainer; private Map<String, GenericContainer<?>> externalServices = Collections.emptyMap(); private Map<String, Map<String, String>> externalServiceEnvs; + private final Map<String, String> functionWorkerEnvs; + private final List<Integer> functionWorkerAdditionalPorts; private final String metadataStoreUrl; private final String configurationMetadataStoreUrl; @@ -182,6 +184,9 @@ public class PulsarCluster { if (spec.proxyMountFiles != null) { spec.proxyMountFiles.forEach(this.proxyContainer::withFileSystemBind); } + if (spec.proxyAdditionalPorts != null) { + spec.proxyAdditionalPorts.forEach(this.proxyContainer::addExposedPort); + } // create bookies bookieContainers.putAll( @@ -268,6 +273,8 @@ public class PulsarCluster { workerContainers.values().forEach(c -> c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE)); }); + functionWorkerEnvs = spec.functionWorkerEnvs; + functionWorkerAdditionalPorts = spec.functionWorkerAdditionalPorts; } public String getPlainTextServiceUrl() { @@ -475,23 +482,25 @@ public class PulsarCluster { String serviceUrl = "pulsar://pulsar-broker-0:" + PulsarContainer.BROKER_PORT; String httpServiceUrl = "http://pulsar-broker-0:" + PulsarContainer.BROKER_HTTP_PORT; workerContainers.putAll(runNumContainers( - "functions-worker-process-" + suffix, - numFunctionWorkers, - (name) -> new WorkerContainer(clusterName, name) - .withNetwork(network) - .withNetworkAliases(name) - // worker settings - .withEnv("PF_workerId", name) - .withEnv("PF_workerHostname", name) - .withEnv("PF_workerPort", "" + PulsarContainer.BROKER_HTTP_PORT) - .withEnv("PF_pulsarFunctionsCluster", clusterName) - .withEnv("PF_pulsarServiceUrl", serviceUrl) - .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl) - // script - .withEnv("clusterName", clusterName) - .withEnv("zookeeperServers", ZKContainer.NAME) - // bookkeeper tools - .withEnv("zkServers", ZKContainer.NAME) + "functions-worker-process-" + suffix, + numFunctionWorkers, + (name) -> new WorkerContainer(clusterName, name) + .withNetwork(network) + .withNetworkAliases(name) + // worker settings + .withEnv("PF_workerId", name) + .withEnv("PF_workerHostname", name) + .withEnv("PF_workerPort", "" + PulsarContainer.BROKER_HTTP_PORT) + .withEnv("PF_pulsarFunctionsCluster", clusterName) + .withEnv("PF_pulsarServiceUrl", serviceUrl) + .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl) + // script + .withEnv("clusterName", clusterName) + .withEnv("zookeeperServers", ZKContainer.NAME) + // bookkeeper tools + .withEnv("zkServers", ZKContainer.NAME) + .withEnv(functionWorkerEnvs) + .withExposedPorts(functionWorkerAdditionalPorts.toArray(new Integer[0])) )); this.startWorkers(); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java index b705b347cff..8a991be49fa 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java @@ -147,6 +147,12 @@ public class PulsarClusterSpec { */ Map<String, String> bookkeeperEnvs; + /** + * Specify envs for function workers. + */ + @Singular + Map<String, String> functionWorkerEnvs; + /** * Specify mount files. */ @@ -170,6 +176,17 @@ public class PulsarClusterSpec { */ List<Integer> bookieAdditionalPorts; + /** + * Additional ports to expose on proxy containers. + */ + List<Integer> proxyAdditionalPorts; + + /** + * Additional ports to expose on function workers. + */ + @Singular + List<Integer> functionWorkerAdditionalPorts; + /** * Enable TLS for connection. */ diff --git a/tests/integration/src/test/resources/containers/otel-collector-config.yaml b/tests/integration/src/test/resources/containers/otel-collector-config.yaml new file mode 100644 index 00000000000..bd332f04283 --- /dev/null +++ b/tests/integration/src/test/resources/containers/otel-collector-config.yaml @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +receivers: + otlp: + protocols: + grpc: + +exporters: + prometheus: + endpoint: "0.0.0.0:8889" + +processors: + batch: + +extensions: + health_check: + zpages: + endpoint: :55679 + +service: + extensions: [zpages, health_check] + pipelines: + metrics: + receivers: [otlp] + processors: [batch] + exporters: [prometheus] \ No newline at end of file diff --git a/tests/integration/src/test/resources/pulsar-metrics.xml b/tests/integration/src/test/resources/pulsar-metrics.xml new file mode 100644 index 00000000000..1c87f2bdf0d --- /dev/null +++ b/tests/integration/src/test/resources/pulsar-metrics.xml @@ -0,0 +1,28 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd" > +<suite name="Pulsar Metrics Integration Tests" verbose="2" annotations="JDK"> + <test name="metrics-test-suite" preserve-order="true"> + <classes> + <class name="org.apache.pulsar.tests.integration.metrics.OpenTelemetrySanityTest"/> + </classes> + </test> +</suite> diff --git a/tests/integration/src/test/resources/pulsar.xml b/tests/integration/src/test/resources/pulsar.xml index bdc5f27cc78..aa9a59a6cda 100644 --- a/tests/integration/src/test/resources/pulsar.xml +++ b/tests/integration/src/test/resources/pulsar.xml @@ -37,5 +37,6 @@ <suite-file path="./pulsar-python.xml" /> <suite-file path="./pulsar-semantics.xml" /> <suite-file path="./pulsar-upgrade.xml" /> + <suite-file path="./pulsar-metrics.xml" /> </suite-files> </suite>