This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new e93c96c1a37b CAMEL-16866 add exchange event-notifier (#19886)
e93c96c1a37b is described below

commit e93c96c1a37b0240fe458b9b4021d1054092cd2b
Author: Jono Morris <[email protected]>
AuthorDate: Wed Nov 12 00:34:13 2025 +1300

    CAMEL-16866 add exchange event-notifier (#19886)
---
 .../main/docs/opentelemetry-metrics-component.adoc |  44 ++-
 .../metrics/OpenTelemetryConstants.java            |   9 +
 .../OpenTelemetryExchangeEventNotifier.java        | 284 +++++++++++++++++++
 ...lemetryExchangeEventNotifierNamingStrategy.java | 104 +++++++
 .../metrics/AbstractOpenTelemetryTest.java         |   6 +-
 ...nTelemetryExchangeEventNotifierDynamicTest.java | 142 ++++++++++
 .../OpenTelemetryExchangeEventNotifierTest.java    | 307 +++++++++++++++++++++
 ...TelemetryExchangeEventNotifierTimeUnitTest.java | 132 +++++++++
 .../OpenTelemetryExchangeNotifierStaticTest.java   |  65 +++++
 .../ExchangeEventNotifierAutoConfigIT.java         | 150 ++++++++++
 10 files changed, 1238 insertions(+), 5 deletions(-)

diff --git 
a/components/camel-opentelemetry-metrics/src/main/docs/opentelemetry-metrics-component.adoc
 
b/components/camel-opentelemetry-metrics/src/main/docs/opentelemetry-metrics-component.adoc
index cf3b38744b92..1c434d896401 100644
--- 
a/components/camel-opentelemetry-metrics/src/main/docs/opentelemetry-metrics-component.adoc
+++ 
b/components/camel-opentelemetry-metrics/src/main/docs/opentelemetry-metrics-component.adoc
@@ -61,7 +61,7 @@ OpenTelemetry meter configuration involves setting up a 
`MeterProvider` that is
 These `Meters` are then used to create instruments like counters, gauges or 
histograms.
 
 ==== How it works
-OpenTelemetry metrics are collected by instrumented application that record 
measurements using instruments like counters and gauges.
+OpenTelemetry metrics are collected by instrumented applications that record 
measurements using instruments like counters and gauges.
 These measurements are then processed by an OpenTelemetry SDK, which 
aggregates and batches them before exporting them,
 either directly or through the OpenTelemetry Collector. See 
https://opentelemetry.io/docs/languages/java/sdk/#sdkmeterprovider[SdkMeterProvider
 documentation]
 for details on how to configure this.
@@ -294,9 +294,14 @@ from("direct:in")
 
 === OpenTelemetry Event Notification
 
+There are event notifiers available for OpenTelemetry to capture Camel route 
and exchange events.
+The xref:#OpenTelemetryComponent-route-event-notifier[Route Event Notifier] 
counts the number of added and running routes,
+while the xref:#OpenTelemetryComponent-exchange-event-notifier[Exchange Event 
Notifier] times exchanges from their creation to their completion.
+
+[[OpenTelemetryComponent-route-event-notifier]]
 ==== Camel Route Event Notifier
 
-The Route Event Notifier counts the number of added and running routes, and 
can be added to the CamelContext as follows:
+A Route event notifier can be added to the CamelContext as follows:
 
 [source,java]
 ----
@@ -321,6 +326,41 @@ The following options are supported:
 | namingStrategy | OpenTelemetryRouteEventNotifierNamingStrategy.DEFAULT | The 
strategy to use for overriding default metric names.
 |=======================================================================
 
+[[OpenTelemetryComponent-exchange-event-notifier]]
+==== Camel Exchange Event Notifier
+
+An Exchange event notifier can be added to the CamelContext as follows:
+
+[source,java]
+----
+camelContext.getManagementStrategy().addEventNotifier(new 
OpenTelemetryExchangeEventNotifier());
+----
+
+Camel specific metrics that are available via the 
`OpenTelemetryExchangeEventNotifier`:
+
+[width="100%",options="header"]
+|=====================================================
+|Default Name |Type |Unit |Description
+|camel.exchange.sent | LongHistogram | TimeUnit.MILLISECONDS | Time taken to 
send a message to the endpoint
+|camel.exchange.elapsed | LongHistogram | TimeUnit.MILLISECONDS | Time taken 
to complete exchange
+|camel.exchanges.last.time | ObservableLongGauge | TimeUnit.MILLISECONDS | 
Last exchange processed time since the Unix epoch
+|camel.exchanges.inflight | ObservableLongGauge | Long | Number of in flight 
messages per route
+|=====================================================
+
+The following options are supported:
+
+[width="100%",options="header"]
+|=======================================================================
+|Name |Default |Description
+| ignoreExchanges | false | A predicate allowing certain exchanges to be 
ignored from metrics capture.
+| namingStrategy | OpenTelemetryExchangeEventNotifierNamingStrategy.DEFAULT | 
The strategy to use for overriding default metric names.
+| durationUnit | TimeUnit.MILLISECONDS | The time unit to use for exchange 
'sent' and 'elapsed' metrics.
+| lastExchangeUnit | TimeUnit.MILLISECONDS | The time unit to use for 'last 
time' metric.
+| baseEndpointURI | true | Whether to use static or dynamic values for 
Endpoint Name attributes in captured metrics.
+By default, static values are used. When using dynamic attributes a dynamic to 
(toD) can compute many different endpoint URIs
+leading to high cardinality in metrics.
+|=======================================================================
+
 == OpenTelemetry Configuration
 
 Applications can export collected metrics to various backends using different 
exporters, including the OpenTelemetry Protocol (OTLP) exporter,
diff --git 
a/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/OpenTelemetryConstants.java
 
b/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/OpenTelemetryConstants.java
index 5514cc29af99..7a3592f0964c 100644
--- 
a/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/OpenTelemetryConstants.java
+++ 
b/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/OpenTelemetryConstants.java
@@ -39,6 +39,12 @@ public class OpenTelemetryConstants {
 
     public static final String HEADER_METRIC_ATTRIBUTES = HEADER_PREFIX + 
"Attributes";
 
+    // Exchange-event metrics
+    public static final String DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT = 
"camel.exchanges.inflight";
+    public static final String DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER = 
"camel.exchange.elapsed";
+    public static final String DEFAULT_CAMEL_EXCHANGE_SENT_TIMER = 
"camel.exchange.sent";
+    public static final String 
DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT = 
"camel.exchanges.last.time";
+
     // Route-event metrics
     public static final String DEFAULT_CAMEL_ROUTES_ADDED = 
"camel.routes.added";
     public static final String DEFAULT_CAMEL_ROUTES_RUNNING = 
"camel.routes.running";
@@ -47,10 +53,13 @@ public class OpenTelemetryConstants {
     // OpenTelemetry Attribute keys
     public static final String CAMEL_CONTEXT_ATTRIBUTE = "camelContext";
     public static final String ROUTE_ID_ATTRIBUTE = "routeId";
+    public static final String FAILED_ATTRIBUTE = "failed";
     public static final String EVENT_TYPE_ATTRIBUTE = "eventType";
     public static final String KIND_ATTRIBUTE = "kind";
+    public static final String ENDPOINT_NAME_ATTRIBUTE = "endpointName";
 
     // OpenTelemetry Attribute values
+    public static final String KIND_EXCHANGE = "CamelExchangeEvent";
     public static final String KIND_ROUTE = "CamelRoute";
 
     private OpenTelemetryConstants() {
diff --git 
a/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifier.java
 
b/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifier.java
new file mode 100644
index 000000000000..db269405f83d
--- /dev/null
+++ 
b/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifier.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry.metrics.eventnotifier;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongGauge;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.Route;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.opentelemetry.metrics.TaskTimer;
+import org.apache.camel.spi.CamelEvent;
+import org.apache.camel.spi.CamelEvent.ExchangeCompletedEvent;
+import org.apache.camel.spi.CamelEvent.ExchangeCreatedEvent;
+import org.apache.camel.spi.CamelEvent.ExchangeEvent;
+import org.apache.camel.spi.CamelEvent.ExchangeFailedEvent;
+import org.apache.camel.spi.CamelEvent.ExchangeSentEvent;
+import org.apache.camel.spi.InflightRepository;
+import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.support.CamelContextHelper;
+import org.apache.camel.support.EventNotifierSupport;
+import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.SimpleEventNotifierSupport;
+
+public class OpenTelemetryExchangeEventNotifier extends EventNotifierSupport 
implements CamelContextAware {
+
+    private static final AtomicLong lastExchangeTimestampHolder = new 
AtomicLong(0);
+    private final Class<ExchangeEvent> eventType = ExchangeEvent.class;
+    private Meter meter;
+    boolean registerTemplates = true;
+    boolean registerKamelets;
+    private InflightRepository inflightRepository;
+
+    // Event Notifier options
+    private Predicate<Exchange> ignoreExchanges = exchange -> false;
+    private OpenTelemetryExchangeEventNotifierNamingStrategy namingStrategy
+            = OpenTelemetryExchangeEventNotifierNamingStrategy.DEFAULT;
+    boolean baseEndpointURI = true;
+    private TimeUnit durationUnit = TimeUnit.MILLISECONDS;
+    private TimeUnit lastExchangeUnit = TimeUnit.MILLISECONDS;
+
+    // Opentelemetry instruments
+    private final Map<String, ObservableLongGauge> inflightGauges = new 
HashMap<>();
+    private ObservableLongGauge lastExchangeTimeGauge;
+    private LongHistogram elapsedTimer;
+    private LongHistogram sentTimer;
+
+    public OpenTelemetryExchangeEventNotifier() {
+        // no-op
+    }
+
+    // Use the base endpoint to avoid increasing the number of separate events 
on dynamic endpoints (ie, toD).
+    public void setBaseEndpointURI(boolean baseEndpointURI) {
+        this.baseEndpointURI = baseEndpointURI;
+    }
+
+    public boolean isBaseEndpointURI() {
+        return baseEndpointURI;
+    }
+
+    public void setIgnoreExchanges(Predicate<Exchange> ignoreExchanges) {
+        this.ignoreExchanges = ignoreExchanges;
+    }
+
+    public Predicate<Exchange> getIgnoreExchanges() {
+        return ignoreExchanges;
+    }
+
+    public void setDurationUnit(TimeUnit durationUnit) {
+        this.durationUnit = durationUnit;
+    }
+
+    public TimeUnit getDurationUnit() {
+        return durationUnit;
+    }
+
+    public void setLastExchangeUnit(TimeUnit lastExchangeUnit) {
+        this.lastExchangeUnit = lastExchangeUnit;
+    }
+
+    public TimeUnit getLastExchangeUnit() {
+        return lastExchangeUnit;
+    }
+
+    public void 
setNamingStrategy(OpenTelemetryExchangeEventNotifierNamingStrategy 
namingStrategy) {
+        this.namingStrategy = namingStrategy;
+    }
+
+    public OpenTelemetryExchangeEventNotifierNamingStrategy 
getNamingStrategy() {
+        return namingStrategy;
+    }
+
+    public void setMeter(Meter meter) {
+        this.meter = meter;
+    }
+
+    public Meter getMeter() {
+        return meter;
+    }
+
+    @Override
+    public boolean isEnabled(CamelEvent eventObject) {
+        return eventType.isAssignableFrom(eventObject.getClass());
+    }
+
+    @Override
+    protected void doInit() throws Exception {
+        super.doInit();
+        if (meter == null) {
+            this.meter = 
CamelContextHelper.findSingleByType(getCamelContext(), Meter.class);
+        }
+        if (meter == null) {
+            this.meter = GlobalOpenTelemetry.get().getMeter("camel");
+        }
+        if (meter == null) {
+            throw new RuntimeCamelException("Could not find any OpenTelemetry 
meter!");
+        }
+        ManagementStrategy ms = getCamelContext().getManagementStrategy();
+        if (ms != null && ms.getManagementAgent() != null) {
+            registerKamelets = 
ms.getManagementAgent().getRegisterRoutesCreateByKamelet();
+            registerTemplates = 
ms.getManagementAgent().getRegisterRoutesCreateByTemplate();
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        inflightRepository = getCamelContext().getInflightRepository();
+
+        // need to be able to add/remove meter accordingly to route changes
+        getCamelContext().getManagementStrategy().addEventNotifier(new 
SimpleEventNotifierSupport() {
+            @Override
+            public void notify(CamelEvent event) {
+                if (event instanceof CamelEvent.RouteAddedEvent addedEvent) {
+                    addInFlightMessageGauge(addedEvent.getRoute());
+                } else if (event instanceof CamelEvent.RouteRemovedEvent 
removedEvent) {
+                    removeInFlightGauge(removedEvent.getRoute().getRouteId());
+                }
+            }
+        });
+
+        this.elapsedTimer = meter
+                .histogramBuilder(getNamingStrategy().getElapsedTimerName())
+                .setDescription("Time taken to complete exchange")
+                .setUnit(durationUnit.name().toLowerCase())
+                .ofLongs().build();
+
+        this.sentTimer = meter
+                .histogramBuilder(getNamingStrategy().getSentTimerName())
+                .setDescription("Time taken to send message to the endpoint")
+                .setUnit(durationUnit.name().toLowerCase())
+                .ofLongs().build();
+
+        this.lastExchangeTimeGauge = meter
+                .gaugeBuilder(getNamingStrategy().getLastProcessedTimeName())
+                .setDescription("Last exchange processed time since the Unix 
epoch")
+                .ofLongs()
+                .setUnit(lastExchangeUnit.name().toLowerCase())
+                .buildWithCallback(
+                        observableMeasurement -> {
+                            observableMeasurement.record(
+                                    
lastExchangeUnit.convert(lastExchangeTimestampHolder.get(), 
TimeUnit.MILLISECONDS));
+                        });
+
+        // add existing routes
+        for (Route route : getCamelContext().getRoutes()) {
+            addInFlightMessageGauge(route);
+        }
+    }
+
+    private void addInFlightMessageGauge(Route route) {
+        boolean skip = (route.isCreatedByKamelet() && !registerKamelets)
+                || (route.isCreatedByRouteTemplate() && !registerTemplates);
+        if (!skip) {
+            String routeId = route.getRouteId();
+            String name = getNamingStrategy().getInflightExchangesName();
+            Attributes attributes = 
getNamingStrategy().getInflightExchangesAttributes(getCamelContext(), routeId);
+            ObservableLongGauge asyncGauge = meter.gaugeBuilder(name)
+                    .setDescription("Route in flight messages")
+                    .ofLongs()
+                    .buildWithCallback(
+                            observableMeasurement -> {
+                                
observableMeasurement.record(inflightRepository.size(routeId), attributes);
+                            });
+            inflightGauges.put(routeId, asyncGauge);
+        }
+    }
+
+    private void removeInFlightGauge(String routeId) {
+        ObservableLongGauge gauge = inflightGauges.remove(routeId);
+        if (gauge != null) {
+            gauge.close();
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        inflightGauges.values().forEach(ObservableLongGauge::close);
+        inflightGauges.clear();
+        if (lastExchangeTimeGauge != null) {
+            lastExchangeTimeGauge.close();
+        }
+    }
+
+    @Override
+    public void notify(CamelEvent eventObject) {
+        if (eventObject instanceof ExchangeEvent exchangeEvent) {
+            // skip routes that should not be included
+            boolean skip = false;
+            String routeId;
+            Exchange exchange = exchangeEvent.getExchange();
+            if (eventObject instanceof ExchangeCreatedEvent) {
+                routeId = exchange.getFromRouteId();
+            } else {
+                routeId = ExchangeHelper.getAtRouteId(exchange);
+            }
+            if (routeId != null) {
+                Route route = exchange.getContext().getRoute(routeId);
+                if (route != null) {
+                    skip = (route.isCreatedByKamelet() && !registerKamelets)
+                            || (route.isCreatedByRouteTemplate() && 
!registerTemplates);
+                }
+            }
+            if (skip) {
+                return;
+            }
+            if (!(getIgnoreExchanges().test(exchange))) {
+                if (eventObject instanceof ExchangeCreatedEvent createdEvent) {
+                    handleCreatedEvent(createdEvent);
+                } else if (eventObject instanceof ExchangeSentEvent sentEvent) 
{
+                    handleSentEvent(sentEvent);
+                } else if (eventObject instanceof ExchangeCompletedEvent
+                        || eventObject instanceof ExchangeFailedEvent) {
+                    handleDoneEvent(exchangeEvent);
+                }
+            }
+        }
+    }
+
+    protected void handleSentEvent(ExchangeSentEvent sentEvent) {
+        Attributes attributes = getNamingStrategy().getAttributes(sentEvent, 
sentEvent.getEndpoint(), isBaseEndpointURI());
+        this.sentTimer.record(durationUnit.convert(sentEvent.getTimeTaken(), 
TimeUnit.MILLISECONDS), attributes);
+    }
+
+    protected void handleCreatedEvent(ExchangeCreatedEvent createdEvent) {
+        String name = getNamingStrategy().getElapsedTimerName();
+        createdEvent.getExchange().setProperty("elapsedTimer:" + name, new 
TaskTimer());
+    }
+
+    protected void handleDoneEvent(ExchangeEvent doneEvent) {
+        String name = getNamingStrategy().getElapsedTimerName();
+        TaskTimer task = (TaskTimer) 
doneEvent.getExchange().removeProperty("elapsedTimer:" + name);
+        if (task != null) {
+            Attributes attributes = getNamingStrategy().getAttributes(
+                    doneEvent, doneEvent.getExchange().getFromEndpoint(), 
isBaseEndpointURI());
+            this.elapsedTimer.record(task.duration(durationUnit), attributes);
+        }
+        lastExchangeTimestampHolder.set(System.currentTimeMillis());
+    }
+}
diff --git 
a/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierNamingStrategy.java
 
b/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierNamingStrategy.java
new file mode 100644
index 000000000000..78ee1ca0e6b6
--- /dev/null
+++ 
b/components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierNamingStrategy.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry.metrics.eventnotifier;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.CamelEvent.ExchangeEvent;
+import org.apache.camel.util.StringHelper;
+
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.CAMEL_CONTEXT_ATTRIBUTE;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_SENT_TIMER;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.ENDPOINT_NAME_ATTRIBUTE;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.EVENT_TYPE_ATTRIBUTE;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.FAILED_ATTRIBUTE;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.KIND_ATTRIBUTE;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.KIND_EXCHANGE;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.ROUTE_ID_ATTRIBUTE;
+
+public interface OpenTelemetryExchangeEventNotifierNamingStrategy {
+
+    /**
+     * Default naming strategy.
+     */
+    OpenTelemetryExchangeEventNotifierNamingStrategy DEFAULT = new 
OpenTelemetryExchangeEventNotifierNamingStrategy() {
+        @Override
+        public String getSentTimerName() {
+            return DEFAULT_CAMEL_EXCHANGE_SENT_TIMER;
+        }
+
+        @Override
+        public String getElapsedTimerName() {
+            return DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER;
+        }
+
+        @Override
+        public String getInflightExchangesName() {
+            return DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT;
+        }
+
+        @Override
+        public String getLastProcessedTimeName() {
+            return DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT;
+        }
+    };
+
+    String getSentTimerName();
+
+    String getElapsedTimerName();
+
+    String getInflightExchangesName();
+
+    String getLastProcessedTimeName();
+
+    default Attributes getAttributes(ExchangeEvent event, Endpoint endpoint, 
boolean isBaseEndpointURI) {
+        String uri = "";
+        if (endpoint != null) {
+            uri = endpoint.toString();
+            if (isBaseEndpointURI) {
+                uri = StringHelper.before(uri, "?", uri);
+            }
+        }
+        Exchange exchange = event.getExchange();
+        String routeId = exchange.getFromRouteId();
+
+        AttributesBuilder builder = Attributes.builder();
+        builder.put(AttributeKey.stringKey(CAMEL_CONTEXT_ATTRIBUTE), 
exchange.getContext().getName())
+                .put(AttributeKey.stringKey(KIND_ATTRIBUTE), KIND_EXCHANGE)
+                .put(AttributeKey.stringKey(EVENT_TYPE_ATTRIBUTE), 
event.getClass().getSimpleName())
+                .put(AttributeKey.stringKey(ENDPOINT_NAME_ATTRIBUTE), uri)
+                .put(AttributeKey.stringKey(FAILED_ATTRIBUTE), 
Boolean.toString(exchange.isFailed()));
+        if (routeId != null) {
+            builder.put(AttributeKey.stringKey(ROUTE_ID_ATTRIBUTE), routeId);
+        }
+        return builder.build();
+    }
+
+    default Attributes getInflightExchangesAttributes(CamelContext 
camelContext, String routeId) {
+        return Attributes.of(
+                AttributeKey.stringKey(CAMEL_CONTEXT_ATTRIBUTE), camelContext 
== null ? "" : camelContext.getName(),
+                AttributeKey.stringKey(KIND_ATTRIBUTE), KIND_EXCHANGE,
+                AttributeKey.stringKey(ROUTE_ID_ATTRIBUTE), routeId);
+    }
+}
diff --git 
a/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/AbstractOpenTelemetryTest.java
 
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/AbstractOpenTelemetryTest.java
index 06c32d7a8470..9dcda85227f3 100644
--- 
a/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/AbstractOpenTelemetryTest.java
+++ 
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/AbstractOpenTelemetryTest.java
@@ -49,9 +49,9 @@ public class AbstractOpenTelemetryTest extends 
CamelTestSupport {
         return pointDataList.get(0);
     }
 
-    protected LongPointData getSingleLongPointData(String meterName, String 
routeId) {
-        List<PointData> pdList = getAllPointDataForRouteId(meterName, routeId);
-        assertEquals(1, pdList.size(), "Should have one metric for routeId " + 
routeId + " and meterName " + meterName);
+    protected LongPointData getSingleLongPointData(String metricName, String 
routeId) {
+        List<PointData> pdList = getAllPointDataForRouteId(metricName, 
routeId);
+        assertEquals(1, pdList.size(), "Should have one metric for routeId " + 
routeId + " and metricName " + metricName);
         PointData pd = pdList.get(0);
         assertInstanceOf(LongPointData.class, pd);
         return (LongPointData) pd;
diff --git 
a/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierDynamicTest.java
 
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierDynamicTest.java
new file mode 100644
index 000000000000..d91f575e098c
--- /dev/null
+++ 
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierDynamicTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry.metrics.eventnotifier;
+
+import java.util.Map;
+
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.PointData;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.opentelemetry.metrics.AbstractOpenTelemetryTest;
+import org.apache.camel.spi.Registry;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.support.DefaultProducer;
+import org.junit.jupiter.api.Test;
+
+import static io.opentelemetry.api.common.AttributeKey.stringKey;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_SENT_TIMER;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.ENDPOINT_NAME_ATTRIBUTE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+
+/**
+ * Test OpenTelemetryExchangeEventNotifier with dynamic endpoint URIs.
+ */
+public class OpenTelemetryExchangeEventNotifierDynamicTest extends 
AbstractOpenTelemetryTest {
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        OpenTelemetryExchangeEventNotifier eventNotifier = getEventNotifier();
+        context.getManagementStrategy().addEventNotifier(eventNotifier);
+        eventNotifier.init();
+        return context;
+    }
+
+    protected OpenTelemetryExchangeEventNotifier getEventNotifier() {
+        OpenTelemetryExchangeEventNotifier eventNotifier = new 
OpenTelemetryExchangeEventNotifier();
+        
eventNotifier.setMeter(otelExtension.getOpenTelemetry().getMeter("meterTest"));
+        // create metrics for each dynamic endpoint URI
+        eventNotifier.setBaseEndpointURI(false);
+        return eventNotifier;
+    }
+
+    @Test
+    public void testEventNotifier() throws Exception {
+        int count = 10;
+        MockEndpoint mock = getMockEndpoint("mock://out");
+        mock.expectedMessageCount(count);
+
+        for (int i = 0; i < count; i++) {
+            template.sendBody("direct://in", i);
+        }
+
+        mock.assertIsSatisfied();
+
+        int nameCount = 0;
+        for (PointData pd : 
getAllPointDataForRouteId(DEFAULT_CAMEL_EXCHANGE_SENT_TIMER, "test")) {
+            String name = 
pd.getAttributes().get(stringKey(ENDPOINT_NAME_ATTRIBUTE));
+            // should have recorded metrics for each dynamic endpoint name, 
e.g. mc://component?clear=val-0&password=xxxxxx
+            if (name != null && name.startsWith("mc://component")) {
+                nameCount++;
+                assertInstanceOf(HistogramPointData.class, pd);
+                HistogramPointData hpd = (HistogramPointData) pd;
+                assertEquals(1, hpd.getCount());
+            }
+        }
+        assertEquals(count, nameCount, "number of 'mc://component' endpoints 
should equal the number of exchanges.");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct://in")
+                        .routeId("test")
+                        
.toD("mc:component?clear=val-${body}&password=secret-${body}")
+                        .to("mock://out");
+            }
+        };
+    }
+
+    @Override
+    protected void bindToRegistry(Registry registry) {
+        registry.bind("mc", new MyComponent());
+    }
+
+    private class MyComponent extends DefaultComponent {
+        @Override
+        protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) {
+            return new MyEndpoint(uri, this, parameters);
+        }
+    }
+
+    private class MyEndpoint extends DefaultEndpoint {
+        private final String password;
+        private final String clear;
+
+        MyEndpoint(String uri, MyComponent myComponent, Map<String, Object> 
parameters) {
+            super(uri, myComponent);
+            this.clear = parameters.remove("clear").toString();
+            this.password = parameters.remove("password").toString();
+        }
+
+        @Override
+        public Producer createProducer() {
+            return new DefaultProducer(this) {
+                @Override
+                public void process(Exchange exchange) {
+                    // noop
+                }
+            };
+        }
+
+        @Override
+        public Consumer createConsumer(Processor processor) {
+            throw new UnsupportedOperationException();
+        }
+    }
+}
diff --git 
a/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierTest.java
 
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierTest.java
new file mode 100644
index 000000000000..332b96ea3203
--- /dev/null
+++ 
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry.metrics.eventnotifier;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.metrics.data.PointData;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.opentelemetry.metrics.AbstractOpenTelemetryTest;
+import org.apache.camel.support.ExpressionAdapter;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ThrowingRunnable;
+import org.junit.jupiter.api.Test;
+
+import static io.opentelemetry.api.common.AttributeKey.stringKey;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_SENT_TIMER;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.ENDPOINT_NAME_ATTRIBUTE;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.EVENT_TYPE_ATTRIBUTE;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.FAILED_ATTRIBUTE;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.KIND_ATTRIBUTE;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.ROUTE_ID_ATTRIBUTE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class OpenTelemetryExchangeEventNotifierTest extends 
AbstractOpenTelemetryTest {
+
+    private static final Long DELAY = 250L;
+    private static final Long TOLERANCE = 100L;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        OpenTelemetryExchangeEventNotifier eventNotifier = new 
OpenTelemetryExchangeEventNotifier();
+        
eventNotifier.setMeter(otelExtension.getOpenTelemetry().getMeter("meterTest"));
+        context.getManagementStrategy().addEventNotifier(eventNotifier);
+        eventNotifier.init();
+        return context;
+    }
+
+    // verify that the 'inflight' gauge is registered and working
+    @Test
+    public void testCamelInflightInstrument() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock://result");
+        mock.expectedMessageCount(1);
+
+        // verify gauges registered for both routes
+        assertEquals(0, inFlightExchangesForRoute("foo"));
+        assertEquals(0, inFlightExchangesForRoute("bar"));
+
+        // verify we have an 'in flight' instrument for each route
+        assertEquals(2, 
getAllPointData(DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT).size());
+
+        verifyInflightExchange(mock, () -> {
+            assertEquals(1L, inFlightExchangesForRoute("foo"));
+        });
+
+        template.sendBody("direct:foo", "Hello");
+
+        assertEquals(0, inFlightExchangesForRoute("foo"));
+        assertEquals(0, inFlightExchangesForRoute("bar"));
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void testElapsedTimerEvents() throws Exception {
+        int count = 10;
+        MockEndpoint mock = getMockEndpoint("mock://result");
+        mock.expectedMessageCount(count);
+        for (int i = 0; i < count; i++) {
+            if (i % 2 == 0) {
+                template.sendBody("direct:foo", "Hello " + i);
+            } else {
+                template.sendBody("direct:bar", "Hello " + i);
+            }
+        }
+
+        mock.assertIsSatisfied();
+
+        verifyElapsedTimerHistogramMetric("bar", DELAY, count / 2);
+        verifyElapsedTimerHistogramMetric("foo", 0, count / 2);
+    }
+
+    @Test
+    public void testSentTimerEvents() throws Exception {
+        int count = 10;
+        MockEndpoint mock = getMockEndpoint("mock://result");
+        mock.expectedMessageCount(count);
+        for (int i = 0; i < count; i++) {
+            if (i % 2 == 0) {
+                template.sendBody("direct:foo", "Hello " + i);
+            } else {
+                template.sendBody("direct:bar", "Hello " + i);
+            }
+        }
+
+        mock.assertIsSatisfied();
+
+        verifySentTimerHistogramMetric("bar", DELAY, count / 2);
+        verifySentTimerHistogramMetric("foo", 0, count / 2);
+    }
+
+    @Test
+    public void testMetricData() {
+        template.sendBody("direct:bar", "Hello");
+
+        List<MetricData> ls = 
getMetricData(DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER);
+        assertEquals(1, ls.size());
+        MetricData md = ls.get(0);
+        assertEquals(MetricDataType.HISTOGRAM, md.getType());
+        assertEquals("camel.exchange.elapsed", md.getName());
+        assertEquals("Time taken to complete exchange", md.getDescription());
+        assertEquals("milliseconds", md.getUnit());
+
+        ls = getMetricData(DEFAULT_CAMEL_EXCHANGE_SENT_TIMER);
+        assertEquals(1, ls.size());
+        md = ls.get(0);
+        assertEquals(MetricDataType.HISTOGRAM, md.getType());
+        assertEquals("camel.exchange.sent", md.getName());
+        assertEquals("Time taken to send message to the endpoint", 
md.getDescription());
+        assertEquals("milliseconds", md.getUnit());
+
+        ls = getMetricData(DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT);
+        assertEquals(1, ls.size());
+        md = ls.get(0);
+        assertEquals(MetricDataType.LONG_GAUGE, md.getType());
+        assertEquals("camel.exchanges.inflight", md.getName());
+        assertEquals("Route in flight messages", md.getDescription());
+
+        ls = 
getMetricData(DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT);
+        assertEquals(1, ls.size());
+        md = ls.get(0);
+        assertEquals(MetricDataType.LONG_GAUGE, md.getType());
+        assertEquals("camel.exchanges.last.time", md.getName());
+        assertEquals("Last exchange processed time since the Unix epoch", 
md.getDescription());
+        assertEquals("milliseconds", md.getUnit());
+    }
+
+    @Test
+    public void testMetricMultipleRouteData() {
+        template.sendBody("direct:foo", "Hello");
+        template.sendBody("direct:bar", "Hello");
+
+        List<MetricData> ls = 
getMetricData(DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER);
+        assertEquals(1, ls.size());
+        MetricData md = ls.get(0);
+        assertEquals(MetricDataType.HISTOGRAM, md.getType());
+        assertEquals("camel.exchange.elapsed", md.getName());
+        assertEquals("Time taken to complete exchange", md.getDescription());
+        assertEquals("milliseconds", md.getUnit());
+
+        ls = getMetricData(DEFAULT_CAMEL_EXCHANGE_SENT_TIMER);
+        assertEquals(1, ls.size());
+        md = ls.get(0);
+        assertEquals(MetricDataType.HISTOGRAM, md.getType());
+        assertEquals("camel.exchange.sent", md.getName());
+        assertEquals("Time taken to send message to the endpoint", 
md.getDescription());
+        assertEquals("milliseconds", md.getUnit());
+
+        ls = getMetricData(DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT);
+        assertEquals(1, ls.size());
+        md = ls.get(0);
+        assertEquals(MetricDataType.LONG_GAUGE, md.getType());
+        assertEquals("camel.exchanges.inflight", md.getName());
+        assertEquals("Route in flight messages", md.getDescription());
+
+        ls = 
getMetricData(DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT);
+        assertEquals(1, ls.size());
+        md = ls.get(0);
+        assertEquals(MetricDataType.LONG_GAUGE, md.getType());
+        assertEquals("camel.exchanges.last.time", md.getName());
+        assertEquals("Last exchange processed time since the Unix epoch", 
md.getDescription());
+        assertEquals("milliseconds", md.getUnit());
+    }
+
+    @Test
+    public void testLastTimeInstrument() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock://result");
+        mock.expectedMessageCount(1);
+
+        template.sendBody("direct:foo", "Hello");
+        Thread.sleep(DELAY);
+
+        long diff = System.currentTimeMillis() - getLastTimeInstrument();
+        assertTrue(diff >= DELAY && diff < DELAY + TOLERANCE, "last time 
instrument");
+    }
+
+    private void verifyElapsedTimerHistogramMetric(String routeId, long delay, 
int msgCount) {
+        PointData pd = 
getPointDataForRouteId(DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER, routeId);
+        assertEquals(routeId, 
pd.getAttributes().get(stringKey(ROUTE_ID_ATTRIBUTE)));
+        assertEquals("direct://" + routeId, 
pd.getAttributes().get(stringKey(ENDPOINT_NAME_ATTRIBUTE)));
+        assertEquals("CamelExchangeEvent", 
pd.getAttributes().get(stringKey(KIND_ATTRIBUTE)));
+        assertEquals("ExchangeCompletedEvent", 
pd.getAttributes().get(stringKey(EVENT_TYPE_ATTRIBUTE)));
+        assertEquals("false", 
pd.getAttributes().get(stringKey(FAILED_ATTRIBUTE)));
+
+        // verify instrument recorded values
+        assertInstanceOf(HistogramPointData.class, pd);
+        HistogramPointData hpd = (HistogramPointData) pd;
+        assertTrue(hpd.getMax() < delay + TOLERANCE, "max value");
+        assertTrue(hpd.getMin() >= delay, "min value");
+        assertEquals(msgCount, hpd.getCount(), "count");
+        assertTrue(hpd.getSum() >= msgCount * delay, "sum");
+    }
+
+    private void verifySentTimerHistogramMetric(String routeId, long delay, 
int msgCount) {
+        List<PointData> ls = 
getAllPointDataForRouteId(DEFAULT_CAMEL_EXCHANGE_SENT_TIMER, routeId);
+        assertEquals(2, ls.size());
+        for (PointData pd : ls) {
+            String endPoint = 
pd.getAttributes().get(stringKey(ENDPOINT_NAME_ATTRIBUTE));
+            assertNotNull(endPoint);
+            if (("direct://" + routeId).equals(endPoint)) {
+                verifySentTimerHistogramMetric(pd, delay + TOLERANCE, delay, 
msgCount);
+            } else if ("mock://result".equals(endPoint)) {
+                verifySentTimerHistogramMetric(pd, msgCount, 0, msgCount);
+            } else {
+                throw new IllegalStateException("Unexpected endpoint name: " + 
endPoint);
+            }
+            assertEquals(routeId, 
pd.getAttributes().get(stringKey(ROUTE_ID_ATTRIBUTE)));
+            assertEquals("CamelExchangeEvent", 
pd.getAttributes().get(stringKey(KIND_ATTRIBUTE)));
+            assertEquals("ExchangeSentEvent", 
pd.getAttributes().get(stringKey(EVENT_TYPE_ATTRIBUTE)));
+            assertEquals("false", 
pd.getAttributes().get(stringKey(FAILED_ATTRIBUTE)));
+        }
+    }
+
+    private long getLastTimeInstrument() {
+        List<PointData> ls = 
getAllPointData(DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT);
+        assertEquals(1, ls.size());
+        PointData pd = ls.get(0);
+        assertInstanceOf(LongPointData.class, pd);
+        return ((LongPointData) pd).getValue();
+    }
+
+    private void verifyInflightExchange(MockEndpoint mock, ThrowingRunnable 
tr) {
+        mock.returnReplyBody(new ExpressionAdapter() {
+            @Override
+            public Object evaluate(Exchange exchange) {
+                try {
+                    Awaitility.await().pollDelay(DELAY, TimeUnit.MILLISECONDS)
+                            .catchUncaughtExceptions().untilAsserted(tr);
+                    return exchange.getIn().getBody();
+                } catch (Exception e) {
+                    if (e.getCause() instanceof InterruptedException) {
+                        throw new CamelExecutionException(e.getMessage(), 
exchange, e);
+                    } else {
+                        throw new RuntimeException("Unexpected Exception");
+                    }
+                }
+            }
+        });
+    }
+
+    private long inFlightExchangesForRoute(String routeId) {
+        PointData pd = 
getPointDataForRouteId(DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT, routeId);
+        assertInstanceOf(LongPointData.class, pd, "Expected LongPointData");
+        assertEquals(routeId, 
pd.getAttributes().get(stringKey(ROUTE_ID_ATTRIBUTE)));
+        assertEquals("CamelExchangeEvent", 
pd.getAttributes().get(stringKey(KIND_ATTRIBUTE)));
+        return ((LongPointData) pd).getValue();
+    }
+
+    private void verifySentTimerHistogramMetric(PointData pd, long max, long 
min, int msgCount) {
+        assertInstanceOf(HistogramPointData.class, pd);
+        HistogramPointData hpd = (HistogramPointData) pd;
+        assertTrue(hpd.getMax() < max, "max value");
+        assertTrue(hpd.getMin() >= min, "min value");
+        assertEquals(msgCount, hpd.getCount(), "count");
+        assertTrue(hpd.getSum() >= msgCount * min, "sum");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct://foo").routeId("foo").to("mock://result");
+                
from("direct://bar").routeId("bar").delay(DELAY).to("mock://result");
+            }
+        };
+    }
+}
diff --git 
a/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierTimeUnitTest.java
 
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierTimeUnitTest.java
new file mode 100644
index 000000000000..81f7138f7d8e
--- /dev/null
+++ 
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeEventNotifierTimeUnitTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry.metrics.eventnotifier;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.metrics.data.PointData;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.opentelemetry.metrics.AbstractOpenTelemetryTest;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_SENT_TIMER;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class OpenTelemetryExchangeEventNotifierTimeUnitTest extends 
AbstractOpenTelemetryTest {
+
+    private static final Long DELAY = 1100L;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        OpenTelemetryExchangeEventNotifier eventNotifier = new 
OpenTelemetryExchangeEventNotifier();
+        
eventNotifier.setMeter(otelExtension.getOpenTelemetry().getMeter("meterTest"));
+
+        // override default time unit from milliseconds to seconds
+        eventNotifier.setDurationUnit(TimeUnit.SECONDS);
+        eventNotifier.setLastExchangeUnit(TimeUnit.SECONDS);
+
+        context.getManagementStrategy().addEventNotifier(eventNotifier);
+        eventNotifier.init();
+        return context;
+    }
+
+    @Test
+    public void testElapsedTimerEvents() throws Exception {
+        int count = 3;
+        MockEndpoint mock = getMockEndpoint("mock://result");
+        mock.expectedMessageCount(count);
+
+        for (int i = 0; i < count; i++) {
+            template.sendBody("direct:bar", "Hello " + i);
+        }
+
+        mock.assertIsSatisfied();
+        verifyElapsedTimerHistogramMetric("bar", Math.floorDiv(DELAY, 1000L), 
count);
+    }
+
+    private void verifyElapsedTimerHistogramMetric(String routeId, long delay, 
int msgCount) {
+        PointData pd = 
getPointDataForRouteId(DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER, routeId);
+        assertInstanceOf(HistogramPointData.class, pd);
+        HistogramPointData hpd = (HistogramPointData) pd;
+        // histogram values are in seconds
+        assertTrue(hpd.getMax() == delay, "max value");
+        assertTrue(hpd.getMin() == delay, "min value");
+        assertTrue(hpd.getSum() >= msgCount * delay, "sum");
+        assertEquals(msgCount, hpd.getCount(), "count");
+    }
+
+    @Test
+    public void testMetricData() {
+        template.sendBody("direct:bar", "Hello");
+
+        List<MetricData> ls = 
getMetricData(DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER);
+        assertEquals(1, ls.size());
+        MetricData md = ls.get(0);
+        assertEquals(MetricDataType.HISTOGRAM, md.getType());
+        assertEquals("camel.exchange.elapsed", md.getName());
+        assertEquals("Time taken to complete exchange", md.getDescription());
+        // time unit should be in seconds as configured
+        assertEquals("seconds", md.getUnit());
+
+        ls = getMetricData(DEFAULT_CAMEL_EXCHANGE_SENT_TIMER);
+        assertEquals(1, ls.size());
+        md = ls.get(0);
+        assertEquals(MetricDataType.HISTOGRAM, md.getType());
+        assertEquals("camel.exchange.sent", md.getName());
+        assertEquals("Time taken to send message to the endpoint", 
md.getDescription());
+        // time unit should be in seconds as configured
+        assertEquals("seconds", md.getUnit());
+
+        ls = getMetricData(DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT);
+        assertEquals(1, ls.size());
+        md = ls.get(0);
+        assertEquals(MetricDataType.LONG_GAUGE, md.getType());
+        assertEquals("camel.exchanges.inflight", md.getName());
+        assertEquals("Route in flight messages", md.getDescription());
+
+        ls = 
getMetricData(DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT);
+        assertEquals(1, ls.size());
+        md = ls.get(0);
+        assertEquals(MetricDataType.LONG_GAUGE, md.getType());
+        assertEquals("camel.exchanges.last.time", md.getName());
+        assertEquals("Last exchange processed time since the Unix epoch", 
md.getDescription());
+        // time unit should be in seconds as configured
+        assertEquals("seconds", md.getUnit());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
from("direct://bar").routeId("bar").delay(DELAY).to("mock://result");
+            }
+        };
+    }
+}
diff --git 
a/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeNotifierStaticTest.java
 
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeNotifierStaticTest.java
new file mode 100644
index 000000000000..5e5dffb880eb
--- /dev/null
+++ 
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/eventnotifier/OpenTelemetryExchangeNotifierStaticTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry.metrics.eventnotifier;
+
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.PointData;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+
+import static io.opentelemetry.api.common.AttributeKey.stringKey;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_SENT_TIMER;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.ENDPOINT_NAME_ATTRIBUTE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+
+public class OpenTelemetryExchangeNotifierStaticTest extends 
OpenTelemetryExchangeEventNotifierDynamicTest {
+
+    @Override
+    protected OpenTelemetryExchangeEventNotifier getEventNotifier() {
+        OpenTelemetryExchangeEventNotifier eventNotifier = new 
OpenTelemetryExchangeEventNotifier();
+        
eventNotifier.setMeter(otelExtension.getOpenTelemetry().getMeter("meterTest"));
+        // create single metrics for URI base endpoint, i.e. without query 
parameters
+        eventNotifier.setBaseEndpointURI(true);
+        return eventNotifier;
+    }
+
+    @Test
+    public void testEventNotifier() throws Exception {
+        int count = 10;
+        MockEndpoint mock = getMockEndpoint("mock://out");
+        mock.expectedMessageCount(count);
+
+        for (int i = 0; i < count; i++) {
+            template.sendBody("direct://in", i);
+        }
+
+        mock.assertIsSatisfied();
+
+        int nameCount = 0;
+        for (PointData pd : 
getAllPointDataForRouteId(DEFAULT_CAMEL_EXCHANGE_SENT_TIMER, "test")) {
+            String name = 
pd.getAttributes().get(stringKey(ENDPOINT_NAME_ATTRIBUTE));
+            if (name != null && name.startsWith("mc://component")) {
+                nameCount++;
+                assertInstanceOf(HistogramPointData.class, pd);
+                HistogramPointData hpd = (HistogramPointData) pd;
+                assertEquals(count, hpd.getCount());
+            }
+        }
+        assertEquals(1, nameCount, "Only one measure should be present for 
'mc://component' endpoint.");
+    }
+}
diff --git 
a/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/integration/eventnotifier/ExchangeEventNotifierAutoConfigIT.java
 
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/integration/eventnotifier/ExchangeEventNotifierAutoConfigIT.java
new file mode 100644
index 000000000000..a1cf31b58a2c
--- /dev/null
+++ 
b/components/camel-opentelemetry-metrics/src/test/java/org/apache/camel/opentelemetry/metrics/integration/eventnotifier/ExchangeEventNotifierAutoConfigIT.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry.metrics.integration.eventnotifier;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.exporter.logging.LoggingMetricExporter;
+import io.opentelemetry.sdk.metrics.data.GaugeData;
+import io.opentelemetry.sdk.metrics.data.HistogramData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import 
org.apache.camel.opentelemetry.metrics.eventnotifier.OpenTelemetryExchangeEventNotifier;
+import org.apache.camel.opentelemetry.metrics.integration.MemoryLogHandler;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_EXCHANGE_SENT_TIMER;
+import static 
org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Test auto-configuration of OpenTelemetryExchangeEventNotifier relying on 
OpenTelemetry global autoconfigure.
+ */
+public class ExchangeEventNotifierAutoConfigIT extends CamelTestSupport {
+
+    private static final Long DELAY = 250L;
+
+    @BeforeAll
+    public static void init() {
+        GlobalOpenTelemetry.resetForTest();
+        // open telemetry auto configuration using console exporter that 
writes to logging
+        System.setProperty("otel.java.global-autoconfigure.enabled", "true");
+        System.setProperty("otel.metrics.exporter", "console");
+        System.setProperty("otel.traces.exporter", "none");
+        System.setProperty("otel.logs.exporter", "none");
+        System.setProperty("otel.propagators", "tracecontext");
+        System.setProperty("otel.metric.export.interval", "50");
+    }
+
+    @AfterEach
+    void cleanup() {
+        GlobalOpenTelemetry.resetForTest();
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        // not setting any meter explicitly, relying on opentelemetry 
autoconfigure
+        OpenTelemetryExchangeEventNotifier eventNotifier = new 
OpenTelemetryExchangeEventNotifier();
+        context.getManagementStrategy().addEventNotifier(eventNotifier);
+        eventNotifier.init();
+        return context;
+    }
+
+    @Test
+    public void testElapsedTimerEvents() throws Exception {
+        Logger logger = 
Logger.getLogger(LoggingMetricExporter.class.getName());
+        MemoryLogHandler handler = new MemoryLogHandler();
+        logger.addHandler(handler);
+
+        int count = 6;
+        MockEndpoint mock = getMockEndpoint("mock://result");
+        mock.expectedMessageCount(count);
+
+        for (int i = 0; i < count; i++) {
+            if (i % 2 == 0) {
+                template.sendBody("direct:foo", "Hello " + i);
+            } else {
+                template.sendBody("direct:bar", "Hello " + i);
+            }
+        }
+
+        mock.assertIsSatisfied();
+
+        await().atMost(Duration.ofMillis(1000L)).until(() -> 
!handler.getLogs().isEmpty());
+
+        List<LogRecord> logs = new ArrayList<>(handler.getLogs());
+        Map<String, Integer> counts = new HashMap<>();
+        for (LogRecord log : logs) {
+            if (log.getParameters() != null && log.getParameters().length > 0) 
{
+                MetricData metricData = (MetricData) log.getParameters()[0];
+                counts.compute(metricData.getName(), (k, v) -> v == null ? 1 : 
v + 1);
+                switch (metricData.getName()) {
+                    case DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER,
+                            DEFAULT_CAMEL_EXCHANGE_SENT_TIMER -> {
+                        // histogram
+                        assertInstanceOf(HistogramData.class, 
metricData.getData());
+                    }
+                    case DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT,
+                            DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT -> {
+                        // gauge
+                        assertInstanceOf(GaugeData.class, 
metricData.getData());
+                    }
+                    default -> fail();
+                }
+            }
+        }
+        assertEquals(4, counts.size());
+        assertTrue(counts.get(DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER) > 0,
+                "Should have metric log for " + 
DEFAULT_CAMEL_EXCHANGE_ELAPSED_TIMER);
+        assertTrue(counts.get(DEFAULT_CAMEL_EXCHANGE_SENT_TIMER) > 0,
+                "Should have metric log for " + 
DEFAULT_CAMEL_EXCHANGE_SENT_TIMER);
+        
assertTrue(counts.get(DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT) > 
0,
+                "Should have metric log for " + 
DEFAULT_CAMEL_EXCHANGE_LAST_PROCESSED_TIME_INSTRUMENT);
+        assertTrue(counts.get(DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT) > 0,
+                "Should have metric log for " + 
DEFAULT_CAMEL_ROUTES_EXCHANGES_INFLIGHT);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct://foo").routeId("foo").to("mock://result");
+                
from("direct://bar").routeId("bar").delay(DELAY).to("mock://result");
+            }
+        };
+    }
+}

Reply via email to