This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new e93c96c1a37b CAMEL-16866 add exchange event-notifier (#19886)
e93c96c1a37b is described below
commit e93c96c1a37b0240fe458b9b4021d1054092cd2b
Author: Jono Morris <[email protected]>
AuthorDate: Wed Nov 12 00:34:13 2025 +1300
CAMEL-16866 add exchange event-notifier (#19886)
---
.../main/docs/opentelemetry-metrics-component.adoc | 44 ++-
.../metrics/OpenTelemetryConstants.java | 9 +
.../OpenTelemetryExchangeEventNotifier.java | 284 +++++++++++++++++++
...lemetryExchangeEventNotifierNamingStrategy.java | 104 +++++++
.../metrics/AbstractOpenTelemetryTest.java | 6 +-
...nTelemetryExchangeEventNotifierDynamicTest.java | 142 ++++++++++
.../OpenTelemetryExchangeEventNotifierTest.java | 307 +++++++++++++++++++++
...TelemetryExchangeEventNotifierTimeUnitTest.java | 132 +++++++++
.../OpenTelemetryExchangeNotifierStaticTest.java | 65 +++++
.../ExchangeEventNotifierAutoConfigIT.java | 150 ++++++++++
10 files changed, 1238 insertions(+), 5 deletions(-)
diff --git
a/components/camel-opentelemetry-metrics/src/main/docs/opentelemetry-metrics-component.adoc
b/components/camel-opentelemetry-metrics/src/main/docs/opentelemetry-metrics-component.adoc
index cf3b38744b92..1c434d896401 100644
---
a/components/camel-opentelemetry-metrics/src/main/docs/opentelemetry-metrics-component.adoc
+++
b/components/camel-opentelemetry-metrics/src/main/docs/opentelemetry-metrics-component.adoc
@@ -61,7 +61,7 @@ OpenTelemetry meter configuration involves setting up a
`MeterProvider` that is
These `Meters` are then used to create instruments like counters, gauges or
histograms.
==== How it works
-OpenTelemetry metrics are collected by instrumented application that record
measurements using instruments like counters and gauges.
+OpenTelemetry metrics are collected by instrumented applications that record
measurements using instruments like counters and gauges.
These measurements are then processed by an OpenTelemetry SDK, which
aggregates and batches them before exporting them,
either directly or through the OpenTelemetry Collector. See
https://opentelemetry.io/docs/languages/java/sdk/#sdkmeterprovider[SdkMeterProvider
documentation]
for details on how to configure this.
@@ -294,9 +294,14 @@ from("direct:in")
=== OpenTelemetry Event Notification
+There are event notifiers available for OpenTelemetry to capture Camel route
and exchange events.
+The xref:#OpenTelemetryComponent-route-event-notifier[Route Event Notifier]
counts the number of added and running routes,
+while the xref:#OpenTelemetryComponent-exchange-event-notifier[Exchange Event
Notifier] times exchanges from their creation to their completion.
+
+[[OpenTelemetryComponent-route-event-notifier]]
==== Camel Route Event Notifier
-The Route Event Notifier counts the number of added and running routes, and
can be added to the CamelContext as follows:
+A Route event notifier can be added to the CamelContext as follows:
[source,java]
----
@@ -321,6 +326,41 @@ The following options are supported:
| namingStrategy | OpenTelemetryRouteEventNotifierNamingStrategy.DEFAULT | The
strategy to use for overriding default metric names.
|=======================================================================
+[[OpenTelemetryComponent-exchange-event-notifier]]
+==== Camel Exchange Event Notifier
+
+An Exchange event notifier can be added to the CamelContext as follows:
+
+[source,java]
+----
+camelContext.getManagementStrategy().addEventNotifier(new
OpenTelemetryExchangeEventNotifier());
+----
+
+Camel specific metrics that are available via the
`OpenTelemetryExchangeEventNotifier`:
+
+[width="100%",options="header"]
+|=====================================================
+|Default Name |Type |Unit |Description
+|camel.exchange.sent | LongHistogram | TimeUnit.MILLISECONDS | Time taken to
send a message to the endpoint
+|camel.exchange.elapsed | LongHistogram | TimeUnit.MILLISECONDS | Time taken
to complete exchange
+|camel.exchanges.last.time | ObservableLongGauge | TimeUnit.MILLISECONDS |
Last exchange processed time since the Unix epoch
+|camel.exchanges.inflight | ObservableLongGauge | Long | Number of in flight
messages per route
+|=====================================================
+
+The following options are supported:
+
+[width="100%",options="header"]
+|=======================================================================
+|Name |Default |Description
+| ignoreExchanges | false | A predicate allowing certain exchanges to be
ignored from metrics capture.
+| namingStrategy | OpenTelemetryExchangeEventNotifierNamingStrategy.DEFAULT |
The strategy to use for overriding default metric names.
+| durationUnit | TimeUnit.MILLISECONDS | The time unit to use for exchange
'sent' and 'elapsed' metrics.
+| lastExchangeUnit | TimeUnit.MILLISECONDS | The time unit to use for 'last
time' metric.
+| baseEndpointURI | true | Whether to use static or dynamic values for
Endpoint Name attributes in captured metrics.
+By default, static values are used. When using dynamic attributes a dynamic to
(toD) can compute many different endpoint URIs
+leading to high cardinality in metrics.
+|=======================================================================
+
== OpenTelemetry Configuration
Applications can export collected metrics to various backends using different
exporters, including the OpenTelemetry Protocol (OTLP) exporter,
diff --git
a/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/OpenTelemetryConstants.java
b/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/OpenTelemetryConstants.java
index 5514cc29af99..7a3592f0964c 100644
---
a/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/OpenTelemetryConstants.java
+++
b/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/OpenTelemetryConstants.java
@@ -39,6 +39,12 @@ public class OpenTelemetryConstants {
public static final String HEADER_METRIC_ATTRIBUTES = HEADER_PREFIX +
"Attributes";
+ // Exchange-event metrics
+ public static final String DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT =
"camel.exchanges.inflight";
+ public static final String DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER =
"camel.exchange.elapsed";
+ public static final String DEFAULT_CAMEL_EXCHANGE_SENT_TIMER =
"camel.exchange.sent";
+ public static final String
DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT =
"camel.exchanges.last.time";
+
// Route-event metrics
public static final String DEFAULT_CAMEL_ROUTES_ADDED =
"camel.routes.added";
public static final String DEFAULT_CAMEL_ROUTES_RUNNING =
"camel.routes.running";
@@ -47,10 +53,13 @@ public class OpenTelemetryConstants {
// OpenTelemetry Attribute keys
public static final String CAMEL_CONTEXT_ATTRIBUTE = "camelContext";
public static final String ROUTE_ID_ATTRIBUTE = "routeId";
+ public static final String FAILED_ATTRIBUTE = "failed";
public static final String EVENT_TYPE_ATTRIBUTE = "eventType";
public static final String KIND_ATTRIBUTE = "kind";
+ public static final String ENDPOINT_NAME_ATTRIBUTE = "endpointName";
// OpenTelemetry Attribute values
+ public static final String KIND_EXCHANGE = "CamelExchangeEvent";
public static final String KIND_ROUTE = "CamelRoute";
private OpenTelemetryConstants() {
diff --git
a/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifier.java
b/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifier.java
new file mode 100644
index 000000000000..db269405f83d
--- /dev/null
+++
b/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifier.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry.metrics.eventnotifier;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongGauge;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.Route;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.opentelemetry.metrics.TaskTimer;
+import org.apache.camel.spi.CamelEvent;
+import org.apache.camel.spi.CamelEvent.ExchangeCompletedEvent;
+import org.apache.camel.spi.CamelEvent.ExchangeCreatedEvent;
+import org.apache.camel.spi.CamelEvent.ExchangeEvent;
+import org.apache.camel.spi.CamelEvent.ExchangeFailedEvent;
+import org.apache.camel.spi.CamelEvent.ExchangeSentEvent;
+import org.apache.camel.spi.InflightRepository;
+import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.support.CamelContextHelper;
+import org.apache.camel.support.EventNotifierSupport;
+import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.SimpleEventNotifierSupport;
+
+public class OpenTelemetryExchangeEventNotifier extends EventNotifierSupport
implements CamelContextAware {
+
+ private static final AtomicLong lastExchangeTimestampHolder = new
AtomicLong(0);
+ private final Class<ExchangeEvent> eventType = ExchangeEvent.class;
+ private Meter meter;
+ boolean registerTemplates = true;
+ boolean registerKamelets;
+ private InflightRepository inflightRepository;
+
+ // Event Notifier options
+ private Predicate<Exchange> ignoreExchanges = exchange -> false;
+ private OpenTelemetryExchangeEventNotifierNamingStrategy namingStrategy
+ = OpenTelemetryExchangeEventNotifierNamingStrategy.DEFAULT;
+ boolean baseEndpointURI = true;
+ private TimeUnit durationUnit = TimeUnit.MILLISECONDS;
+ private TimeUnit lastExchangeUnit = TimeUnit.MILLISECONDS;
+
+ // Opentelemetry instruments
+ private final Map<String, ObservableLongGauge> inflightGauges = new
HashMap<>();
+ private ObservableLongGauge lastExchangeTimeGauge;
+ private LongHistogram elapsedTimer;
+ private LongHistogram sentTimer;
+
+ public OpenTelemetryExchangeEventNotifier() {
+ // no-op
+ }
+
+ // Use the base endpoint to avoid increasing the number of separate events
on dynamic endpoints (ie, toD).
+ public void setBaseEndpointURI(boolean baseEndpointURI) {
+ this.baseEndpointURI = baseEndpointURI;
+ }
+
+ public boolean isBaseEndpointURI() {
+ return baseEndpointURI;
+ }
+
+ public void setIgnoreExchanges(Predicate<Exchange> ignoreExchanges) {
+ this.ignoreExchanges = ignoreExchanges;
+ }
+
+ public Predicate<Exchange> getIgnoreExchanges() {
+ return ignoreExchanges;
+ }
+
+ public void setDurationUnit(TimeUnit durationUnit) {
+ this.durationUnit = durationUnit;
+ }
+
+ public TimeUnit getDurationUnit() {
+ return durationUnit;
+ }
+
+ public void setLastExchangeUnit(TimeUnit lastExchangeUnit) {
+ this.lastExchangeUnit = lastExchangeUnit;
+ }
+
+ public TimeUnit getLastExchangeUnit() {
+ return lastExchangeUnit;
+ }
+
+ public void
setNamingStrategy(OpenTelemetryExchangeEventNotifierNamingStrategy
namingStrategy) {
+ this.namingStrategy = namingStrategy;
+ }
+
+ public OpenTelemetryExchangeEventNotifierNamingStrategy
getNamingStrategy() {
+ return namingStrategy;
+ }
+
+ public void setMeter(Meter meter) {
+ this.meter = meter;
+ }
+
+ public Meter getMeter() {
+ return meter;
+ }
+
+ @Override
+ public boolean isEnabled(CamelEvent eventObject) {
+ return eventType.isAssignableFrom(eventObject.getClass());
+ }
+
+ @Override
+ protected void doInit() throws Exception {
+ super.doInit();
+ if (meter == null) {
+ this.meter =
CamelContextHelper.findSingleByType(getCamelContext(), Meter.class);
+ }
+ if (meter == null) {
+ this.meter = GlobalOpenTelemetry.get().getMeter("camel");
+ }
+ if (meter == null) {
+ throw new RuntimeCamelException("Could not find any OpenTelemetry
meter!");
+ }
+ ManagementStrategy ms = getCamelContext().getManagementStrategy();
+ if (ms != null && ms.getManagementAgent() != null) {
+ registerKamelets =
ms.getManagementAgent().getRegisterRoutesCreateByKamelet();
+ registerTemplates =
ms.getManagementAgent().getRegisterRoutesCreateByTemplate();
+ }
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ inflightRepository = getCamelContext().getInflightRepository();
+
+ // need to be able to add/remove meter accordingly to route changes
+ getCamelContext().getManagementStrategy().addEventNotifier(new
SimpleEventNotifierSupport() {
+ @Override
+ public void notify(CamelEvent event) {
+ if (event instanceof CamelEvent.RouteAddedEvent addedEvent) {
+ addInFlightMessageGauge(addedEvent.getRoute());
+ } else if (event instanceof CamelEvent.RouteRemovedEvent
removedEvent) {
+ removeInFlightGauge(removedEvent.getRoute().getRouteId());
+ }
+ }
+ });
+
+ this.elapsedTimer = meter
+ .histogramBuilder(getNamingStrategy().getElapsedTimerName())
+ .setDescription("Time taken to complete exchange")
+ .setUnit(durationUnit.name().toLowerCase())
+ .ofLongs().build();
+
+ this.sentTimer = meter
+ .histogramBuilder(getNamingStrategy().getSentTimerName())
+ .setDescription("Time taken to send message to the endpoint")
+ .setUnit(durationUnit.name().toLowerCase())
+ .ofLongs().build();
+
+ this.lastExchangeTimeGauge = meter
+ .gaugeBuilder(getNamingStrategy().getLastProcessedTimeName())
+ .setDescription("Last exchange processed time since the Unix
epoch")
+ .ofLongs()
+ .setUnit(lastExchangeUnit.name().toLowerCase())
+ .buildWithCallback(
+ observableMeasurement -> {
+ observableMeasurement.record(
+
lastExchangeUnit.convert(lastExchangeTimestampHolder.get(),
TimeUnit.MILLISECONDS));
+ });
+
+ // add existing routes
+ for (Route route : getCamelContext().getRoutes()) {
+ addInFlightMessageGauge(route);
+ }
+ }
+
+ private void addInFlightMessageGauge(Route route) {
+ boolean skip = (route.isCreatedByKamelet() && !registerKamelets)
+ || (route.isCreatedByRouteTemplate() && !registerTemplates);
+ if (!skip) {
+ String routeId = route.getRouteId();
+ String name = getNamingStrategy().getInflightExchangesName();
+ Attributes attributes =
getNamingStrategy().getInflightExchangesAttributes(getCamelContext(), routeId);
+ ObservableLongGauge asyncGauge = meter.gaugeBuilder(name)
+ .setDescription("Route in flight messages")
+ .ofLongs()
+ .buildWithCallback(
+ observableMeasurement -> {
+
observableMeasurement.record(inflightRepository.size(routeId), attributes);
+ });
+ inflightGauges.put(routeId, asyncGauge);
+ }
+ }
+
+ private void removeInFlightGauge(String routeId) {
+ ObservableLongGauge gauge = inflightGauges.remove(routeId);
+ if (gauge != null) {
+ gauge.close();
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ inflightGauges.values().forEach(ObservableLongGauge::close);
+ inflightGauges.clear();
+ if (lastExchangeTimeGauge != null) {
+ lastExchangeTimeGauge.close();
+ }
+ }
+
+ @Override
+ public void notify(CamelEvent eventObject) {
+ if (eventObject instanceof ExchangeEvent exchangeEvent) {
+ // skip routes that should not be included
+ boolean skip = false;
+ String routeId;
+ Exchange exchange = exchangeEvent.getExchange();
+ if (eventObject instanceof ExchangeCreatedEvent) {
+ routeId = exchange.getFromRouteId();
+ } else {
+ routeId = ExchangeHelper.getAtRouteId(exchange);
+ }
+ if (routeId != null) {
+ Route route = exchange.getContext().getRoute(routeId);
+ if (route != null) {
+ skip = (route.isCreatedByKamelet() && !registerKamelets)
+ || (route.isCreatedByRouteTemplate() &&
!registerTemplates);
+ }
+ }
+ if (skip) {
+ return;
+ }
+ if (!(getIgnoreExchanges().test(exchange))) {
+ if (eventObject instanceof ExchangeCreatedEvent createdEvent) {
+ handleCreatedEvent(createdEvent);
+ } else if (eventObject instanceof ExchangeSentEvent sentEvent)
{
+ handleSentEvent(sentEvent);
+ } else if (eventObject instanceof ExchangeCompletedEvent
+ || eventObject instanceof ExchangeFailedEvent) {
+ handleDoneEvent(exchangeEvent);
+ }
+ }
+ }
+ }
+
+ protected void handleSentEvent(ExchangeSentEvent sentEvent) {
+ Attributes attributes = getNamingStrategy().getAttributes(sentEvent,
sentEvent.getEndpoint(), isBaseEndpointURI());
+ this.sentTimer.record(durationUnit.convert(sentEvent.getTimeTaken(),
TimeUnit.MILLISECONDS), attributes);
+ }
+
+ protected void handleCreatedEvent(ExchangeCreatedEvent createdEvent) {
+ String name = getNamingStrategy().getElapsedTimerName();
+ createdEvent.getExchange().setProperty("elapsedTimer:" + name, new
TaskTimer());
+ }
+
+ protected void handleDoneEvent(ExchangeEvent doneEvent) {
+ String name = getNamingStrategy().getElapsedTimerName();
+ TaskTimer task = (TaskTimer)
doneEvent.getExchange().removeProperty("elapsedTimer:" + name);
+ if (task != null) {
+ Attributes attributes = getNamingStrategy().getAttributes(
+ doneEvent, doneEvent.getExchange().getFromEndpoint(),
isBaseEndpointURI());
+ this.elapsedTimer.record(task.duration(durationUnit), attributes);
+ }
+ lastExchangeTimestampHolder.set(System.currentTimeMillis());
+ }
+}
diff --git
a/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierNamingStrategy.java
b/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierNamingStrategy.java
new file mode 100644
index 000000000000..78ee1ca0e6b6
--- /dev/null
+++
b/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierNamingStrategy.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry.metrics.eventnotifier;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.CamelEvent.ExchangeEvent;
+import org.apache.camel.util.StringHelper;
+
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.CAMEL_CONTEXT_ATTRIBUTE;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_SENT_TIMER;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.ENDPOINT_NAME_ATTRIBUTE;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.EVENT_TYPE_ATTRIBUTE;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.FAILED_ATTRIBUTE;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.KIND_ATTRIBUTE;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.KIND_EXCHANGE;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.ROUTE_ID_ATTRIBUTE;
+
+public interface OpenTelemetryExchangeEventNotifierNamingStrategy {
+
+ /**
+ * Default naming strategy.
+ */
+ OpenTelemetryExchangeEventNotifierNamingStrategy DEFAULT = new
OpenTelemetryExchangeEventNotifierNamingStrategy() {
+ @Override
+ public String getSentTimerName() {
+ return DEFAULT_CAMEL_EXCHANGE_SENT_TIMER;
+ }
+
+ @Override
+ public String getElapsedTimerName() {
+ return DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER;
+ }
+
+ @Override
+ public String getInflightExchangesName() {
+ return DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT;
+ }
+
+ @Override
+ public String getLastProcessedTimeName() {
+ return DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT;
+ }
+ };
+
+ String getSentTimerName();
+
+ String getElapsedTimerName();
+
+ String getInflightExchangesName();
+
+ String getLastProcessedTimeName();
+
+ default Attributes getAttributes(ExchangeEvent event, Endpoint endpoint,
boolean isBaseEndpointURI) {
+ String uri = "";
+ if (endpoint != null) {
+ uri = endpoint.toString();
+ if (isBaseEndpointURI) {
+ uri = StringHelper.before(uri, "?", uri);
+ }
+ }
+ Exchange exchange = event.getExchange();
+ String routeId = exchange.getFromRouteId();
+
+ AttributesBuilder builder = Attributes.builder();
+ builder.put(AttributeKey.stringKey(CAMEL_CONTEXT_ATTRIBUTE),
exchange.getContext().getName())
+ .put(AttributeKey.stringKey(KIND_ATTRIBUTE), KIND_EXCHANGE)
+ .put(AttributeKey.stringKey(EVENT_TYPE_ATTRIBUTE),
event.getClass().getSimpleName())
+ .put(AttributeKey.stringKey(ENDPOINT_NAME_ATTRIBUTE), uri)
+ .put(AttributeKey.stringKey(FAILED_ATTRIBUTE),
Boolean.toString(exchange.isFailed()));
+ if (routeId != null) {
+ builder.put(AttributeKey.stringKey(ROUTE_ID_ATTRIBUTE), routeId);
+ }
+ return builder.build();
+ }
+
+ default Attributes getInflightExchangesAttributes(CamelContext
camelContext, String routeId) {
+ return Attributes.of(
+ AttributeKey.stringKey(CAMEL_CONTEXT_ATTRIBUTE), camelContext
== null ? "" : camelContext.getName(),
+ AttributeKey.stringKey(KIND_ATTRIBUTE), KIND_EXCHANGE,
+ AttributeKey.stringKey(ROUTE_ID_ATTRIBUTE), routeId);
+ }
+}
diff --git
a/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/AbstractOpenTelemetryTest.java
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/AbstractOpenTelemetryTest.java
index 06c32d7a8470..9dcda85227f3 100644
---
a/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/AbstractOpenTelemetryTest.java
+++
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/AbstractOpenTelemetryTest.java
@@ -49,9 +49,9 @@ public class AbstractOpenTelemetryTest extends
CamelTestSupport {
return pointDataList.get(0);
}
- protected LongPointData getSingleLongPointData(String meterName, String
routeId) {
- List<PointData> pdList = getAllPointDataForRouteId(meterName, routeId);
- assertEquals(1, pdList.size(), "Should have one metric for routeId " +
routeId + " and meterName " + meterName);
+ protected LongPointData getSingleLongPointData(String metricName, String
routeId) {
+ List<PointData> pdList = getAllPointDataForRouteId(metricName,
routeId);
+ assertEquals(1, pdList.size(), "Should have one metric for routeId " +
routeId + " and metricName " + metricName);
PointData pd = pdList.get(0);
assertInstanceOf(LongPointData.class, pd);
return (LongPointData) pd;
diff --git
a/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierDynamicTest.java
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierDynamicTest.java
new file mode 100644
index 000000000000..d91f575e098c
--- /dev/null
+++
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierDynamicTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry.metrics.eventnotifier;
+
+import java.util.Map;
+
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.PointData;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.opentelemetry.metrics.AbstractOpenTelemetryTest;
+import org.apache.camel.spi.Registry;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.support.DefaultProducer;
+import org.junit.jupiter.api.Test;
+
+import static io.opentelemetry.api.common.AttributeKey.stringKey;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_SENT_TIMER;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.ENDPOINT_NAME_ATTRIBUTE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+
+/**
+ * Test OpenTelemetryExchangeEventNotifier with dynamic endpoint URIs.
+ */
+public class OpenTelemetryExchangeEventNotifierDynamicTest extends
AbstractOpenTelemetryTest {
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+ OpenTelemetryExchangeEventNotifier eventNotifier = getEventNotifier();
+ context.getManagementStrategy().addEventNotifier(eventNotifier);
+ eventNotifier.init();
+ return context;
+ }
+
+ protected OpenTelemetryExchangeEventNotifier getEventNotifier() {
+ OpenTelemetryExchangeEventNotifier eventNotifier = new
OpenTelemetryExchangeEventNotifier();
+
eventNotifier.setMeter(otelExtension.getOpenTelemetry().getMeter("meterTest"));
+ // create metrics for each dynamic endpoint URI
+ eventNotifier.setBaseEndpointURI(false);
+ return eventNotifier;
+ }
+
+ @Test
+ public void testEventNotifier() throws Exception {
+ int count = 10;
+ MockEndpoint mock = getMockEndpoint("mock://out");
+ mock.expectedMessageCount(count);
+
+ for (int i = 0; i < count; i++) {
+ template.sendBody("direct://in", i);
+ }
+
+ mock.assertIsSatisfied();
+
+ int nameCount = 0;
+ for (PointData pd :
getAllPointDataForRouteId(DEFAULT_CAMEL_EXCHANGE_SENT_TIMER, "test")) {
+ String name =
pd.getAttributes().get(stringKey(ENDPOINT_NAME_ATTRIBUTE));
+ // should have recorded metrics for each dynamic endpoint name,
e.g. mc://component?clear=val-0&password=xxxxxx
+ if (name != null && name.startsWith("mc://component")) {
+ nameCount++;
+ assertInstanceOf(HistogramPointData.class, pd);
+ HistogramPointData hpd = (HistogramPointData) pd;
+ assertEquals(1, hpd.getCount());
+ }
+ }
+ assertEquals(count, nameCount, "number of 'mc://component' endpoints
should equal the number of exchanges.");
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct://in")
+ .routeId("test")
+
.toD("mc:component?clear=val-${body}&password=secret-${body}")
+ .to("mock://out");
+ }
+ };
+ }
+
+ @Override
+ protected void bindToRegistry(Registry registry) {
+ registry.bind("mc", new MyComponent());
+ }
+
+ private class MyComponent extends DefaultComponent {
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining,
Map<String, Object> parameters) {
+ return new MyEndpoint(uri, this, parameters);
+ }
+ }
+
+ private class MyEndpoint extends DefaultEndpoint {
+ private final String password;
+ private final String clear;
+
+ MyEndpoint(String uri, MyComponent myComponent, Map<String, Object>
parameters) {
+ super(uri, myComponent);
+ this.clear = parameters.remove("clear").toString();
+ this.password = parameters.remove("password").toString();
+ }
+
+ @Override
+ public Producer createProducer() {
+ return new DefaultProducer(this) {
+ @Override
+ public void process(Exchange exchange) {
+ // noop
+ }
+ };
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git
a/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierTest.java
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierTest.java
new file mode 100644
index 000000000000..332b96ea3203
--- /dev/null
+++
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry.metrics.eventnotifier;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.metrics.data.PointData;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.opentelemetry.metrics.AbstractOpenTelemetryTest;
+import org.apache.camel.support.ExpressionAdapter;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ThrowingRunnable;
+import org.junit.jupiter.api.Test;
+
+import static io.opentelemetry.api.common.AttributeKey.stringKey;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_SENT_TIMER;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.ENDPOINT_NAME_ATTRIBUTE;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.EVENT_TYPE_ATTRIBUTE;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.FAILED_ATTRIBUTE;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.KIND_ATTRIBUTE;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.ROUTE_ID_ATTRIBUTE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class OpenTelemetryExchangeEventNotifierTest extends
AbstractOpenTelemetryTest {
+
+ private static final Long DELAY = 250L;
+ private static final Long TOLERANCE = 100L;
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+ OpenTelemetryExchangeEventNotifier eventNotifier = new
OpenTelemetryExchangeEventNotifier();
+
eventNotifier.setMeter(otelExtension.getOpenTelemetry().getMeter("meterTest"));
+ context.getManagementStrategy().addEventNotifier(eventNotifier);
+ eventNotifier.init();
+ return context;
+ }
+
+ // verify that the 'inflight' gauge is registered and working
+ @Test
+ public void testCamelInflightInstrument() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock://result");
+ mock.expectedMessageCount(1);
+
+ // verify gauges registered for both routes
+ assertEquals(0, inFlightExchangesForRoute("foo"));
+ assertEquals(0, inFlightExchangesForRoute("bar"));
+
+ // verify we have an 'in flight' instrument for each route
+ assertEquals(2,
getAllPointData(DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT).size());
+
+ verifyInflightExchange(mock, () -> {
+ assertEquals(1L, inFlightExchangesForRoute("foo"));
+ });
+
+ template.sendBody("direct:foo", "Hello");
+
+ assertEquals(0, inFlightExchangesForRoute("foo"));
+ assertEquals(0, inFlightExchangesForRoute("bar"));
+
+ mock.assertIsSatisfied();
+ }
+
+ @Test
+ public void testElapsedTimerEvents() throws Exception {
+ int count = 10;
+ MockEndpoint mock = getMockEndpoint("mock://result");
+ mock.expectedMessageCount(count);
+ for (int i = 0; i < count; i++) {
+ if (i % 2 == 0) {
+ template.sendBody("direct:foo", "Hello " + i);
+ } else {
+ template.sendBody("direct:bar", "Hello " + i);
+ }
+ }
+
+ mock.assertIsSatisfied();
+
+ verifyElapsedTimerHistogramMetric("bar", DELAY, count / 2);
+ verifyElapsedTimerHistogramMetric("foo", 0, count / 2);
+ }
+
+ @Test
+ public void testSentTimerEvents() throws Exception {
+ int count = 10;
+ MockEndpoint mock = getMockEndpoint("mock://result");
+ mock.expectedMessageCount(count);
+ for (int i = 0; i < count; i++) {
+ if (i % 2 == 0) {
+ template.sendBody("direct:foo", "Hello " + i);
+ } else {
+ template.sendBody("direct:bar", "Hello " + i);
+ }
+ }
+
+ mock.assertIsSatisfied();
+
+ verifySentTimerHistogramMetric("bar", DELAY, count / 2);
+ verifySentTimerHistogramMetric("foo", 0, count / 2);
+ }
+
+ @Test
+ public void testMetricData() {
+ template.sendBody("direct:bar", "Hello");
+
+ List<MetricData> ls =
getMetricData(DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER);
+ assertEquals(1, ls.size());
+ MetricData md = ls.get(0);
+ assertEquals(MetricDataType.HISTOGRAM, md.getType());
+ assertEquals("camel.exchange.elapsed", md.getName());
+ assertEquals("Time taken to complete exchange", md.getDescription());
+ assertEquals("milliseconds", md.getUnit());
+
+ ls = getMetricData(DEFAULT_CAMEL_EXCHANGE_SENT_TIMER);
+ assertEquals(1, ls.size());
+ md = ls.get(0);
+ assertEquals(MetricDataType.HISTOGRAM, md.getType());
+ assertEquals("camel.exchange.sent", md.getName());
+ assertEquals("Time taken to send message to the endpoint",
md.getDescription());
+ assertEquals("milliseconds", md.getUnit());
+
+ ls = getMetricData(DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT);
+ assertEquals(1, ls.size());
+ md = ls.get(0);
+ assertEquals(MetricDataType.LONG_GAUGE, md.getType());
+ assertEquals("camel.exchanges.inflight", md.getName());
+ assertEquals("Route in flight messages", md.getDescription());
+
+ ls =
getMetricData(DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT);
+ assertEquals(1, ls.size());
+ md = ls.get(0);
+ assertEquals(MetricDataType.LONG_GAUGE, md.getType());
+ assertEquals("camel.exchanges.last.time", md.getName());
+ assertEquals("Last exchange processed time since the Unix epoch",
md.getDescription());
+ assertEquals("milliseconds", md.getUnit());
+ }
+
+ @Test
+ public void testMetricMultipleRouteData() {
+ template.sendBody("direct:foo", "Hello");
+ template.sendBody("direct:bar", "Hello");
+
+ List<MetricData> ls =
getMetricData(DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER);
+ assertEquals(1, ls.size());
+ MetricData md = ls.get(0);
+ assertEquals(MetricDataType.HISTOGRAM, md.getType());
+ assertEquals("camel.exchange.elapsed", md.getName());
+ assertEquals("Time taken to complete exchange", md.getDescription());
+ assertEquals("milliseconds", md.getUnit());
+
+ ls = getMetricData(DEFAULT_CAMEL_EXCHANGE_SENT_TIMER);
+ assertEquals(1, ls.size());
+ md = ls.get(0);
+ assertEquals(MetricDataType.HISTOGRAM, md.getType());
+ assertEquals("camel.exchange.sent", md.getName());
+ assertEquals("Time taken to send message to the endpoint",
md.getDescription());
+ assertEquals("milliseconds", md.getUnit());
+
+ ls = getMetricData(DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT);
+ assertEquals(1, ls.size());
+ md = ls.get(0);
+ assertEquals(MetricDataType.LONG_GAUGE, md.getType());
+ assertEquals("camel.exchanges.inflight", md.getName());
+ assertEquals("Route in flight messages", md.getDescription());
+
+ ls =
getMetricData(DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT);
+ assertEquals(1, ls.size());
+ md = ls.get(0);
+ assertEquals(MetricDataType.LONG_GAUGE, md.getType());
+ assertEquals("camel.exchanges.last.time", md.getName());
+ assertEquals("Last exchange processed time since the Unix epoch",
md.getDescription());
+ assertEquals("milliseconds", md.getUnit());
+ }
+
+ @Test
+ public void testLastTimeInstrument() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock://result");
+ mock.expectedMessageCount(1);
+
+ template.sendBody("direct:foo", "Hello");
+ Thread.sleep(DELAY);
+
+ long diff = System.currentTimeMillis() - getLastTimeInstrument();
+ assertTrue(diff >= DELAY && diff < DELAY + TOLERANCE, "last time
instrument");
+ }
+
+ private void verifyElapsedTimerHistogramMetric(String routeId, long delay,
int msgCount) {
+ PointData pd =
getPointDataForRouteId(DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER, routeId);
+ assertEquals(routeId,
pd.getAttributes().get(stringKey(ROUTE_ID_ATTRIBUTE)));
+ assertEquals("direct://" + routeId,
pd.getAttributes().get(stringKey(ENDPOINT_NAME_ATTRIBUTE)));
+ assertEquals("CamelExchangeEvent",
pd.getAttributes().get(stringKey(KIND_ATTRIBUTE)));
+ assertEquals("ExchangeCompletedEvent",
pd.getAttributes().get(stringKey(EVENT_TYPE_ATTRIBUTE)));
+ assertEquals("false",
pd.getAttributes().get(stringKey(FAILED_ATTRIBUTE)));
+
+ // verify instrument recorded values
+ assertInstanceOf(HistogramPointData.class, pd);
+ HistogramPointData hpd = (HistogramPointData) pd;
+ assertTrue(hpd.getMax() < delay + TOLERANCE, "max value");
+ assertTrue(hpd.getMin() >= delay, "min value");
+ assertEquals(msgCount, hpd.getCount(), "count");
+ assertTrue(hpd.getSum() >= msgCount * delay, "sum");
+ }
+
+ private void verifySentTimerHistogramMetric(String routeId, long delay,
int msgCount) {
+ List<PointData> ls =
getAllPointDataForRouteId(DEFAULT_CAMEL_EXCHANGE_SENT_TIMER, routeId);
+ assertEquals(2, ls.size());
+ for (PointData pd : ls) {
+ String endPoint =
pd.getAttributes().get(stringKey(ENDPOINT_NAME_ATTRIBUTE));
+ assertNotNull(endPoint);
+ if (("direct://" + routeId).equals(endPoint)) {
+ verifySentTimerHistogramMetric(pd, delay + TOLERANCE, delay,
msgCount);
+ } else if ("mock://result".equals(endPoint)) {
+ verifySentTimerHistogramMetric(pd, msgCount, 0, msgCount);
+ } else {
+ throw new IllegalStateException("Unexpected endpoint name: " +
endPoint);
+ }
+ assertEquals(routeId,
pd.getAttributes().get(stringKey(ROUTE_ID_ATTRIBUTE)));
+ assertEquals("CamelExchangeEvent",
pd.getAttributes().get(stringKey(KIND_ATTRIBUTE)));
+ assertEquals("ExchangeSentEvent",
pd.getAttributes().get(stringKey(EVENT_TYPE_ATTRIBUTE)));
+ assertEquals("false",
pd.getAttributes().get(stringKey(FAILED_ATTRIBUTE)));
+ }
+ }
+
+ private long getLastTimeInstrument() {
+ List<PointData> ls =
getAllPointData(DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT);
+ assertEquals(1, ls.size());
+ PointData pd = ls.get(0);
+ assertInstanceOf(LongPointData.class, pd);
+ return ((LongPointData) pd).getValue();
+ }
+
+ private void verifyInflightExchange(MockEndpoint mock, ThrowingRunnable
tr) {
+ mock.returnReplyBody(new ExpressionAdapter() {
+ @Override
+ public Object evaluate(Exchange exchange) {
+ try {
+ Awaitility.await().pollDelay(DELAY, TimeUnit.MILLISECONDS)
+ .catchUncaughtExceptions().untilAsserted(tr);
+ return exchange.getIn().getBody();
+ } catch (Exception e) {
+ if (e.getCause() instanceof InterruptedException) {
+ throw new CamelExecutionException(e.getMessage(),
exchange, e);
+ } else {
+ throw new RuntimeException("Unexpected Exception");
+ }
+ }
+ }
+ });
+ }
+
+ private long inFlightExchangesForRoute(String routeId) {
+ PointData pd =
getPointDataForRouteId(DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT, routeId);
+ assertInstanceOf(LongPointData.class, pd, "Expected LongPointData");
+ assertEquals(routeId,
pd.getAttributes().get(stringKey(ROUTE_ID_ATTRIBUTE)));
+ assertEquals("CamelExchangeEvent",
pd.getAttributes().get(stringKey(KIND_ATTRIBUTE)));
+ return ((LongPointData) pd).getValue();
+ }
+
+ private void verifySentTimerHistogramMetric(PointData pd, long max, long
min, int msgCount) {
+ assertInstanceOf(HistogramPointData.class, pd);
+ HistogramPointData hpd = (HistogramPointData) pd;
+ assertTrue(hpd.getMax() < max, "max value");
+ assertTrue(hpd.getMin() >= min, "min value");
+ assertEquals(msgCount, hpd.getCount(), "count");
+ assertTrue(hpd.getSum() >= msgCount * min, "sum");
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct://foo").routeId("foo").to("mock://result");
+
from("direct://bar").routeId("bar").delay(DELAY).to("mock://result");
+ }
+ };
+ }
+}
diff --git
a/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierTimeUnitTest.java
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierTimeUnitTest.java
new file mode 100644
index 000000000000..81f7138f7d8e
--- /dev/null
+++
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierTimeUnitTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry.metrics.eventnotifier;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.metrics.data.PointData;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.opentelemetry.metrics.AbstractOpenTelemetryTest;
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_SENT_TIMER;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class OpenTelemetryExchangeEventNotifierTimeUnitTest extends
AbstractOpenTelemetryTest {
+
+ private static final Long DELAY = 1100L;
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+ OpenTelemetryExchangeEventNotifier eventNotifier = new
OpenTelemetryExchangeEventNotifier();
+
eventNotifier.setMeter(otelExtension.getOpenTelemetry().getMeter("meterTest"));
+
+ // override default time unit from milliseconds to seconds
+ eventNotifier.setDurationUnit(TimeUnit.SECONDS);
+ eventNotifier.setLastExchangeUnit(TimeUnit.SECONDS);
+
+ context.getManagementStrategy().addEventNotifier(eventNotifier);
+ eventNotifier.init();
+ return context;
+ }
+
+ @Test
+ public void testElapsedTimerEvents() throws Exception {
+ int count = 3;
+ MockEndpoint mock = getMockEndpoint("mock://result");
+ mock.expectedMessageCount(count);
+
+ for (int i = 0; i < count; i++) {
+ template.sendBody("direct:bar", "Hello " + i);
+ }
+
+ mock.assertIsSatisfied();
+ verifyElapsedTimerHistogramMetric("bar", Math.floorDiv(DELAY, 1000L),
count);
+ }
+
+ private void verifyElapsedTimerHistogramMetric(String routeId, long delay,
int msgCount) {
+ PointData pd =
getPointDataForRouteId(DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER, routeId);
+ assertInstanceOf(HistogramPointData.class, pd);
+ HistogramPointData hpd = (HistogramPointData) pd;
+ // histogram values are in seconds
+ assertTrue(hpd.getMax() == delay, "max value");
+ assertTrue(hpd.getMin() == delay, "min value");
+ assertTrue(hpd.getSum() >= msgCount * delay, "sum");
+ assertEquals(msgCount, hpd.getCount(), "count");
+ }
+
+ @Test
+ public void testMetricData() {
+ template.sendBody("direct:bar", "Hello");
+
+ List<MetricData> ls =
getMetricData(DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER);
+ assertEquals(1, ls.size());
+ MetricData md = ls.get(0);
+ assertEquals(MetricDataType.HISTOGRAM, md.getType());
+ assertEquals("camel.exchange.elapsed", md.getName());
+ assertEquals("Time taken to complete exchange", md.getDescription());
+ // time unit should be in seconds as configured
+ assertEquals("seconds", md.getUnit());
+
+ ls = getMetricData(DEFAULT_CAMEL_EXCHANGE_SENT_TIMER);
+ assertEquals(1, ls.size());
+ md = ls.get(0);
+ assertEquals(MetricDataType.HISTOGRAM, md.getType());
+ assertEquals("camel.exchange.sent", md.getName());
+ assertEquals("Time taken to send message to the endpoint",
md.getDescription());
+ // time unit should be in seconds as configured
+ assertEquals("seconds", md.getUnit());
+
+ ls = getMetricData(DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT);
+ assertEquals(1, ls.size());
+ md = ls.get(0);
+ assertEquals(MetricDataType.LONG_GAUGE, md.getType());
+ assertEquals("camel.exchanges.inflight", md.getName());
+ assertEquals("Route in flight messages", md.getDescription());
+
+ ls =
getMetricData(DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT);
+ assertEquals(1, ls.size());
+ md = ls.get(0);
+ assertEquals(MetricDataType.LONG_GAUGE, md.getType());
+ assertEquals("camel.exchanges.last.time", md.getName());
+ assertEquals("Last exchange processed time since the Unix epoch",
md.getDescription());
+ // time unit should be in seconds as configured
+ assertEquals("seconds", md.getUnit());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+
from("direct://bar").routeId("bar").delay(DELAY).to("mock://result");
+ }
+ };
+ }
+}
diff --git
a/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeNotifierStaticTest.java
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeNotifierStaticTest.java
new file mode 100644
index 000000000000..5e5dffb880eb
--- /dev/null
+++
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeNotifierStaticTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry.metrics.eventnotifier;
+
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.PointData;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+
+import static io.opentelemetry.api.common.AttributeKey.stringKey;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_SENT_TIMER;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.ENDPOINT_NAME_ATTRIBUTE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+
+public class OpenTelemetryExchangeNotifierStaticTest extends
OpenTelemetryExchangeEventNotifierDynamicTest {
+
+ @Override
+ protected OpenTelemetryExchangeEventNotifier getEventNotifier() {
+ OpenTelemetryExchangeEventNotifier eventNotifier = new
OpenTelemetryExchangeEventNotifier();
+
eventNotifier.setMeter(otelExtension.getOpenTelemetry().getMeter("meterTest"));
+ // create single metrics for URI base endpoint, i.e. without query
parameters
+ eventNotifier.setBaseEndpointURI(true);
+ return eventNotifier;
+ }
+
+ @Test
+ public void testEventNotifier() throws Exception {
+ int count = 10;
+ MockEndpoint mock = getMockEndpoint("mock://out");
+ mock.expectedMessageCount(count);
+
+ for (int i = 0; i < count; i++) {
+ template.sendBody("direct://in", i);
+ }
+
+ mock.assertIsSatisfied();
+
+ int nameCount = 0;
+ for (PointData pd :
getAllPointDataForRouteId(DEFAULT_CAMEL_EXCHANGE_SENT_TIMER, "test")) {
+ String name =
pd.getAttributes().get(stringKey(ENDPOINT_NAME_ATTRIBUTE));
+ if (name != null && name.startsWith("mc://component")) {
+ nameCount++;
+ assertInstanceOf(HistogramPointData.class, pd);
+ HistogramPointData hpd = (HistogramPointData) pd;
+ assertEquals(count, hpd.getCount());
+ }
+ }
+ assertEquals(1, nameCount, "Only one measure should be present for
'mc://component' endpoint.");
+ }
+}
diff --git
a/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/integration/eventnotifier/ExchangeEventNotifierAutoConfigIT.java
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/integration/eventnotifier/ExchangeEventNotifierAutoConfigIT.java
new file mode 100644
index 000000000000..a1cf31b58a2c
--- /dev/null
+++
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/integration/eventnotifier/ExchangeEventNotifierAutoConfigIT.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry.metrics.integration.eventnotifier;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.exporter.logging.LoggingMetricExporter;
+import io.opentelemetry.sdk.metrics.data.GaugeData;
+import io.opentelemetry.sdk.metrics.data.HistogramData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import
org.apache.camel.opentelemetry.metrics.eventnotifier.OpenTelemetryExchangeEventNotifier;
+import org.apache.camel.opentelemetry.metrics.integration.MemoryLogHandler;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_SENT_TIMER;
+import static
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Test auto-configuration of OpenTelemetryExchangeEventNotifier relying on
OpenTelemetry global autoconfigure.
+ */
+public class ExchangeEventNotifierAutoConfigIT extends CamelTestSupport {
+
+ private static final Long DELAY = 250L;
+
+ @BeforeAll
+ public static void init() {
+ GlobalOpenTelemetry.resetForTest();
+ // open telemetry auto configuration using console exporter that
writes to logging
+ System.setProperty("otel.java.global-autoconfigure.enabled", "true");
+ System.setProperty("otel.metrics.exporter", "console");
+ System.setProperty("otel.traces.exporter", "none");
+ System.setProperty("otel.logs.exporter", "none");
+ System.setProperty("otel.propagators", "tracecontext");
+ System.setProperty("otel.metric.export.interval", "50");
+ }
+
+ @AfterEach
+ void cleanup() {
+ GlobalOpenTelemetry.resetForTest();
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+ // not setting any meter explicitly, relying on opentelemetry
autoconfigure
+ OpenTelemetryExchangeEventNotifier eventNotifier = new
OpenTelemetryExchangeEventNotifier();
+ context.getManagementStrategy().addEventNotifier(eventNotifier);
+ eventNotifier.init();
+ return context;
+ }
+
+ @Test
+ public void testElapsedTimerEvents() throws Exception {
+ Logger logger =
Logger.getLogger(LoggingMetricExporter.class.getName());
+ MemoryLogHandler handler = new MemoryLogHandler();
+ logger.addHandler(handler);
+
+ int count = 6;
+ MockEndpoint mock = getMockEndpoint("mock://result");
+ mock.expectedMessageCount(count);
+
+ for (int i = 0; i < count; i++) {
+ if (i % 2 == 0) {
+ template.sendBody("direct:foo", "Hello " + i);
+ } else {
+ template.sendBody("direct:bar", "Hello " + i);
+ }
+ }
+
+ mock.assertIsSatisfied();
+
+ await().atMost(Duration.ofMillis(1000L)).until(() ->
!handler.getLogs().isEmpty());
+
+ List<LogRecord> logs = new ArrayList<>(handler.getLogs());
+ Map<String, Integer> counts = new HashMap<>();
+ for (LogRecord log : logs) {
+ if (log.getParameters() != null && log.getParameters().length > 0)
{
+ MetricData metricData = (MetricData) log.getParameters()[0];
+ counts.compute(metricData.getName(), (k, v) -> v == null ? 1 :
v + 1);
+ switch (metricData.getName()) {
+ case DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER,
+ DEFAULT_CAMEL_EXCHANGE_SENT_TIMER -> {
+ // histogram
+ assertInstanceOf(HistogramData.class,
metricData.getData());
+ }
+ case DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT,
+ DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT -> {
+ // gauge
+ assertInstanceOf(GaugeData.class,
metricData.getData());
+ }
+ default -> fail();
+ }
+ }
+ }
+ assertEquals(4, counts.size());
+ assertTrue(counts.get(DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER) > 0,
+ "Should have metric log for " +
DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER);
+ assertTrue(counts.get(DEFAULT_CAMEL_EXCHANGE_SENT_TIMER) > 0,
+ "Should have metric log for " +
DEFAULT_CAMEL_EXCHANGE_SENT_TIMER);
+
assertTrue(counts.get(DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT) >
0,
+ "Should have metric log for " +
DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT);
+ assertTrue(counts.get(DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT) > 0,
+ "Should have metric log for " +
DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct://foo").routeId("foo").to("mock://result");
+
from("direct://bar").routeId("bar").delay(DELAY).to("mock://result");
+ }
+ };
+ }
+}