This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f2c5b46be30 CAMEL-23684: BacklogTracer - correlate message history by 
breadcrumb ID (#23765)
0f2c5b46be30 is described below

commit 0f2c5b46be301de3ed960304eebb8a2580d962c9
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Jun 4 15:10:23 2026 +0200

    CAMEL-23684: BacklogTracer - correlate message history by breadcrumb ID 
(#23765)
    
    * CAMEL-23684: BacklogTracer - correlate message history by breadcrumb ID
    
    Capture the breadcrumb ID (CamelBreadcrumbId header) in BacklogTracer
    event messages and use it to correlate message history across broker
    boundaries. When exchanges pass through Kafka, SEDA, JMS, etc., each
    consumer creates a new independent exchange - the breadcrumb ID links
    them together so the history tab shows the full end-to-end flow.
    
    Falls back to exchange ID / correlation ID matching when breadcrumb
    is not set (e.g. when allowUseOriginalMessage is disabled).
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    Signed-off-by: Claus Ibsen <[email protected]>
    
    * CAMEL-23684: Add unit test for breadcrumb history correlation across SEDA
    
    Test verifies that message history captures events from multiple routes
    connected via SEDA (which creates independent exchanges), correlated by
    breadcrumb ID. Also fixes the completion logic: in breadcrumb mode,
    snapshot on every route completion without clearing the provisional
    queue, so downstream routes accumulate into the same history.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    Signed-off-by: Claus Ibsen <[email protected]>
    
    ---------
    
    Signed-off-by: Claus Ibsen <[email protected]>
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../camel/spi/BacklogTracerEventMessage.java       |   8 ++
 .../apache/camel/impl/debugger/BacklogTracer.java  |  50 ++++++++--
 .../impl/debugger/DefaultBacklogDebugger.java      |  10 +-
 .../debugger/DefaultBacklogTracerEventMessage.java |  12 ++-
 .../camel/impl/engine/CamelInternalProcessor.java  |  18 ++--
 .../BacklogTracerMessageHistoryBreadcrumbTest.java | 111 +++++++++++++++++++++
 6 files changed, 190 insertions(+), 19 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
 
b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
index 7956e8587e53..a87c08ec0b6a 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
@@ -129,6 +129,14 @@ public interface BacklogTracerEventMessage extends 
BacklogEventMessage {
     @Nullable
     String getCorrelationExchangeId();
 
+    /**
+     * The breadcrumb id that links exchanges across broker boundaries (Kafka, 
SEDA, JMS, etc.)
+     *
+     * @since 4.21
+     */
+    @Nullable
+    String getBreadcrumbId();
+
     /**
      * The name of the thread that is processing the message, when this event 
was captured.
      */
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
index 2bc0a9543535..873e31e628d1 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
@@ -65,6 +65,7 @@ public class BacklogTracer extends ServiceSupport implements 
org.apache.camel.sp
     // use tracer to capture additional information for capturing latest 
completed exchange message-history
     private final Queue<BacklogTracerEventMessage> provisionalHistoryQueue = 
new LinkedBlockingQueue<>(MAX_BACKLOG_SIZE);
     private final Queue<BacklogTracerEventMessage> completeHistoryQueue = new 
LinkedBlockingQueue<>(MAX_BACKLOG_SIZE + 1);
+    private volatile String lastCompletedBreadcrumbId;
     private boolean removeOnDump = true;
     private int bodyMaxChars = 32 * 1024;
     private boolean bodyIncludeStreams;
@@ -164,6 +165,7 @@ public class BacklogTracer extends ServiceSupport 
implements org.apache.camel.sp
         String toNodeLabel = StringHelper.limitLength(node.getLabel(), 50);
         String exchangeId = exchange.getExchangeId();
         String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+        String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
         int level = node.getLevel();
         String fromRouteId = exchange.getFromRouteId();
         String source = LoggerHelper.getLineNumberLoggerName(node);
@@ -172,7 +174,7 @@ public class BacklogTracer extends ServiceSupport 
implements org.apache.camel.sp
         DefaultBacklogTracerEventMessage event = new 
DefaultBacklogTracerEventMessage(
                 camelContext, first, last, incrementTraceCounter(), timestamp, 
source, fromRouteId, fromRouteId, toNode,
                 toNodeParentId, null, null, toNodeShortName, toNodeLabel, 
level,
-                exchangeId, correlationExchangeId, false, false, data);
+                exchangeId, correlationExchangeId, breadcrumbId, false, false, 
data);
         if ((first || last) && fromRouteId != null) {
             Route route = camelContext.getRoute(fromRouteId);
             if (route != null && route.getConsumer() != null) {
@@ -195,21 +197,51 @@ public class BacklogTracer extends ServiceSupport 
implements org.apache.camel.sp
 
         // handle capturing events for last full completed exchange (aka 
replay)
         if (camelContext.isMessageHistory()) {
-            String tid = null;
             var head = provisionalHistoryQueue.peek();
+            String bid = null;
+            String tid = null;
             if (head != null) {
+                bid = head.getBreadcrumbId();
                 tid = head.getExchangeId();
             }
-            if (tid == null || tid.equals(event.getExchangeId()) || 
tid.equals(event.getCorrelationExchangeId())) {
+            // correlate by breadcrumb ID when available (links exchanges 
across broker boundaries)
+            // fallback to exchange ID / correlation ID matching when 
breadcrumb is not set
+            boolean match;
+            if (bid != null && event.getBreadcrumbId() != null) {
+                match = bid.equals(event.getBreadcrumbId());
+            } else {
+                match = tid == null || tid.equals(event.getExchangeId()) || 
tid.equals(event.getCorrelationExchangeId());
+            }
+            // check if this event continues a previously completed breadcrumb 
flow
+            // (e.g. a downstream route connected via Kafka/SEDA that starts 
after the originating route finished)
+            boolean appendMode = false;
+            if (head == null && event.getBreadcrumbId() != null
+                    && 
event.getBreadcrumbId().equals(lastCompletedBreadcrumbId)) {
+                appendMode = true;
+            } else if (head != null && event.getBreadcrumbId() != null
+                    && 
event.getBreadcrumbId().equals(lastCompletedBreadcrumbId)
+                    && head.getBreadcrumbId() != null
+                    && 
head.getBreadcrumbId().equals(lastCompletedBreadcrumbId)) {
+                appendMode = true;
+            }
+            if (match || appendMode) {
                 boolean added = provisionalHistoryQueue.offer(event);
                 boolean original = head != null && event.getRouteId() != null 
&& event.getRouteId().equals(head.getRouteId());
                 if (event.isLast() && original) {
-                    // only trigger completion when it's the original last
-                    completeHistoryQueue.clear();
-                    completeHistoryQueue.addAll(provisionalHistoryQueue);
-                    // in case we hit the limit then ensure the last is always 
added to the complete history
-                    if (!added) {
-                        completeHistoryQueue.add(event);
+                    if (appendMode) {
+                        // downstream route finished: merge into existing 
complete history
+                        completeHistoryQueue.addAll(provisionalHistoryQueue);
+                        if (!added) {
+                            completeHistoryQueue.add(event);
+                        }
+                    } else {
+                        // originating route finished: replace complete history
+                        completeHistoryQueue.clear();
+                        completeHistoryQueue.addAll(provisionalHistoryQueue);
+                        if (!added) {
+                            completeHistoryQueue.add(event);
+                        }
+                        lastCompletedBreadcrumbId = event.getBreadcrumbId();
                     }
                     provisionalHistoryQueue.clear();
                 }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
index 7adf38f6b7c8..09d9aad0e903 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
@@ -896,6 +896,7 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
                         message.getToNodeParentWhenLabel(),
                         message.getToNodeShortName(), message.getToNodeLabel(),
                         message.getToNodeLevel(), message.getExchangeId(), 
message.getCorrelationExchangeId(),
+                        message.getBreadcrumbId(),
                         false, false,
                         dumpAsJSonObject(suspendedExchange.getExchange())));
     }
@@ -939,6 +940,7 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
             String routeId = CamelContextHelper.getRouteId(definition);
             String exchangeId = exchange.getExchangeId();
             String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+            String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
             int level = definition.getLevel();
             long uid = debugCounter.incrementAndGet();
             String source = LoggerHelper.getLineNumberLoggerName(definition);
@@ -948,7 +950,7 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
                     = new DefaultBacklogTracerEventMessage(
                             camelContext,
                             first, false, uid, timestamp, source, fromRouteId, 
routeId, toNode, toNodeParentId, null, null,
-                            toNodeShortName, toNodeLabel, level, exchangeId, 
correlationExchangeId,
+                            toNodeShortName, toNodeLabel, level, exchangeId, 
correlationExchangeId, breadcrumbId,
                             false, false, data);
             suspendedBreakpointMessages.put(nodeId, msg);
 
@@ -1030,6 +1032,7 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
             String routeId = CamelContextHelper.getRouteId(definition);
             String exchangeId = exchange.getExchangeId();
             String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+            String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
             int level = definition.getLevel();
             long uid = debugCounter.incrementAndGet();
             String source = LoggerHelper.getLineNumberLoggerName(definition);
@@ -1039,7 +1042,7 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
                             camelContext,
                             false, false, uid, timestamp, source, fromRouteId, 
routeId, toNode, toNodeParentId, null, null,
                             toNodeShortName, toNodeLabel, level,
-                            exchangeId, correlationExchangeId,
+                            exchangeId, correlationExchangeId, breadcrumbId,
                             false, false, data);
             suspendedBreakpointMessages.put(toNode, msg);
 
@@ -1140,6 +1143,7 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
             String routeId = route != null ? route.getRouteId() : toNode;
             String exchangeId = exchange.getExchangeId();
             String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+            String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
             int level = definition.getLevel();
             long uid = debugCounter.incrementAndGet();
             String source = LoggerHelper.getLineNumberLoggerName(route != null 
? route : definition);
@@ -1148,7 +1152,7 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
                     = new DefaultBacklogTracerEventMessage(
                             camelContext,
                             false, true, uid, timestamp, source, fromRouteId, 
routeId, toNode, toNodeParentId,
-                            null, null, null, null, level, exchangeId, 
correlationExchangeId,
+                            null, null, null, null, level, exchangeId, 
correlationExchangeId, breadcrumbId,
                             false, false, data);
             // we want to capture if there was an exception
             if (cause != null) {
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
index 7e8b0edabc38..dc0dd0b8755f 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
@@ -54,6 +54,7 @@ public final class DefaultBacklogTracerEventMessage 
implements BacklogTracerEven
     private final int toNodeLevel;
     private final String exchangeId;
     private final String correlationExchangeId;
+    private final String breadcrumbId;
     private final String threadName;
     private String endpointUri;
     private boolean remoteEndpoint;
@@ -77,7 +78,7 @@ public final class DefaultBacklogTracerEventMessage 
implements BacklogTracerEven
                                             String toNodeParentId,
                                             String toNodeParentWhenId, String 
toNodeParentWhenLabel,
                                             String toNodeShortName, String 
toNodeLabel, int toNodeLevel, String exchangeId,
-                                            String correlationExchangeId,
+                                            String correlationExchangeId, 
String breadcrumbId,
                                             boolean rest, boolean template, 
JsonObject data) {
         this.camelContext = camelContext;
         this.watch = new StopWatch();
@@ -97,6 +98,7 @@ public final class DefaultBacklogTracerEventMessage 
implements BacklogTracerEven
         this.toNodeLevel = toNodeLevel;
         this.exchangeId = exchangeId;
         this.correlationExchangeId = correlationExchangeId;
+        this.breadcrumbId = breadcrumbId;
         this.rest = rest;
         this.template = template;
         this.threadName = Thread.currentThread().getName();
@@ -199,6 +201,11 @@ public final class DefaultBacklogTracerEventMessage 
implements BacklogTracerEven
         return correlationExchangeId;
     }
 
+    @Override
+    public String getBreadcrumbId() {
+        return breadcrumbId;
+    }
+
     @Override
     public String getProcessingThreadName() {
         return threadName;
@@ -587,6 +594,9 @@ public final class DefaultBacklogTracerEventMessage 
implements BacklogTracerEven
         if (correlationExchangeId != null) {
             jo.put("correlationExchangeId", correlationExchangeId);
         }
+        if (breadcrumbId != null) {
+            jo.put("breadcrumbId", breadcrumbId);
+        }
         if (timestamp > 0) {
             jo.put("timestamp", timestamp);
         }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 66c56142faf0..530b8534c4d6 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -649,6 +649,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                 String source = LoggerHelper.getLineNumberLoggerName(input);
                 String exchangeId = exchange.getExchangeId();
                 String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+                String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
                 String routeId = routeDefinition.getRouteId();
                 String fromRouteId = exchange.getFromRouteId();
                 int level = 1;
@@ -666,7 +667,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                         input.getId(),
                         null, null, null,
                         input.getShortName(), input.getLabel(),
-                        level, exchangeId, correlationExchangeId, rest, 
template, data);
+                        level, exchangeId, correlationExchangeId, 
breadcrumbId, rest, template, data);
                 if (exchange.getFromEndpoint() instanceof 
EndpointServiceLocation esl) {
                     first.setEndpointServiceUrl(esl.getServiceUrl());
                     first.setEndpointServiceProtocol(esl.getServiceProtocol());
@@ -687,6 +688,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                 String source = LoggerHelper.getLineNumberLoggerName(input);
                 String exchangeId = exchange.getExchangeId();
                 String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+                String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
                 String routeId = routeDefinition.getRouteId();
                 String fromRouteId = exchange.getFromRouteId();
                 int level = 1;
@@ -704,7 +706,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                         input.getId(),
                         null, null, null,
                         input.getShortName(), input.getLabel(),
-                        level, exchangeId, correlationExchangeId, rest, 
template, data);
+                        level, exchangeId, correlationExchangeId, 
breadcrumbId, rest, template, data);
                 if (exchange.getFromEndpoint() instanceof 
EndpointServiceLocation esl) {
                     first.setEndpointServiceUrl(esl.getServiceUrl());
                     first.setEndpointServiceProtocol(esl.getServiceProtocol());
@@ -775,6 +777,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
         public DefaultBacklogTracerEventMessage before(Exchange exchange) 
throws Exception {
             String exchangeId = exchange.getExchangeId();
             String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+            String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
             int level = processorDefinition.getLevel();
             String routeId = ExchangeHelper.getAtRouteId(exchange);
             String fromRouteId = exchange.getFromRouteId();
@@ -794,7 +797,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                     processorDefinition.getId(),
                     null, null, null,
                     processorDefinition.getShortName(), 
processorDefinition.getLabel(),
-                    level + 1, exchangeId, correlationExchangeId, false, 
false, data);
+                    level + 1, exchangeId, correlationExchangeId, 
breadcrumbId, false, false, data);
             backlogTracer.traceEvent(event);
             return event;
         }
@@ -806,6 +809,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                 String source = 
LoggerHelper.getLineNumberLoggerName(processorDefinition);
                 String exchangeId = exchange.getExchangeId();
                 String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+                String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
                 String routeId = ExchangeHelper.getAtRouteId(exchange);
                 String fromRouteId = exchange.getFromRouteId();
                 int level = 1;
@@ -823,7 +827,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                         processorDefinition.getId(),
                         null, null, null,
                         processorDefinition.getShortName(), 
processorDefinition.getLabel(),
-                        level, exchangeId, correlationExchangeId, false, 
false, data);
+                        level, exchangeId, correlationExchangeId, 
breadcrumbId, false, false, data);
                 if (exchange.getFromEndpoint() instanceof 
EndpointServiceLocation esl) {
                     first.setEndpointServiceUrl(esl.getServiceUrl());
                     first.setEndpointServiceProtocol(esl.getServiceProtocol());
@@ -931,6 +935,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                 String toNodeLabel = 
StringHelper.limitLength(processorDefinition.getLabel(), 50);
                 String exchangeId = exchange.getExchangeId();
                 String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+                String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
                 int level = processorDefinition.getLevel();
 
                 boolean includeExchangeProperties = 
backlogTracer.isIncludeExchangeProperties();
@@ -951,7 +956,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                         toNodeParentId,
                         toNodeParentWhenId, toNodeParentWhenLabel,
                         toNodeShortName, toNodeLabel, level,
-                        exchangeId, correlationExchangeId, rest, template, 
data);
+                        exchangeId, correlationExchangeId, breadcrumbId, rest, 
template, data);
                 backlogTracer.traceEvent(event);
                 return event;
             }
@@ -969,6 +974,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                     String fromRouteId = exchange.getFromRouteId();
                     String exchangeId = exchange.getExchangeId();
                     String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+                    String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
                     boolean includeExchangeProperties = 
backlogTracer.isIncludeExchangeProperties();
                     boolean includeExchangeVariables = 
backlogTracer.isIncludeExchangeVariables();
                     long created = exchange.getClock().getCreated();
@@ -985,7 +991,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                             false, true, 
backlogTracer.incrementTraceCounter(), created, source, fromRouteId, routeId, 
toNode,
                             null, null,
                             null, toNodeShortName, toNodeLabel,
-                            level, exchangeId, correlationExchangeId, rest, 
template, data);
+                            level, exchangeId, correlationExchangeId, 
breadcrumbId, rest, template, data);
                     backlogTracer.traceEvent(pseudoLast);
                     doneProcessing(exchange, pseudoLast);
                     doneProcessing(exchange, pseudoFirst);
diff --git 
a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryBreadcrumbTest.java
 
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryBreadcrumbTest.java
new file mode 100644
index 000000000000..4235eeb71939
--- /dev/null
+++ 
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryBreadcrumbTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.BacklogTracerEventMessage;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DisabledOnOs(OS.AIX)
+public class BacklogTracerMessageHistoryBreadcrumbTest extends 
ManagementTestSupport {
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testMessageHistoryCorrelatesBySedaBreadcrumb() throws 
Exception {
+        MBeanServer mbeanServer = getMBeanServer();
+        ObjectName on
+                = new ObjectName("org.apache.camel:context=" + 
context.getManagementName() + ",type=tracer,name=BacklogTracer");
+        assertNotNull(on);
+        assertTrue(mbeanServer.isRegistered(on));
+
+        // disable remove-on-dump so polling in await doesn't drain the queue
+        mbeanServer.setAttribute(on, new 
javax.management.Attribute("RemoveOnDump", Boolean.FALSE));
+
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        // wait for the history to be completed (the seda consumer runs async)
+        await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+            List<BacklogTracerEventMessage> events
+                    = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on, 
"dumpLatestMessageHistory", null, null);
+            assertNotNull(events);
+
+            // verify events span multiple routes (connected via seda which 
creates independent exchanges)
+            Set<String> routeIds = events.stream()
+                    .map(BacklogTracerEventMessage::getRouteId)
+                    .collect(Collectors.toSet());
+            assertTrue(routeIds.contains("starter"), "Should contain starter 
route events");
+            assertTrue(routeIds.contains("processor"), "Should contain 
processor route events");
+
+            // all events should share the same breadcrumb ID
+            String breadcrumb = events.get(0).getBreadcrumbId();
+            assertNotNull(breadcrumb, "Breadcrumb ID should be set");
+            for (BacklogTracerEventMessage event : events) {
+                assertEquals(breadcrumb, event.getBreadcrumbId(),
+                        "All events should share the same breadcrumb ID");
+            }
+
+            // verify events span multiple exchange IDs (seda creates new 
exchanges)
+            Set<String> exchangeIds = events.stream()
+                    .map(BacklogTracerEventMessage::getExchangeId)
+                    .collect(Collectors.toSet());
+            assertTrue(exchangeIds.size() > 1,
+                    "Should have events from multiple exchange IDs (seda 
creates new exchanges)");
+        });
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                context.setUseBreadcrumb(true);
+                context.setBacklogTracing(true);
+                context.setMessageHistory(true);
+
+                from("direct:start").routeId("starter")
+                        .to("mock:a").id("a")
+                        .to("seda:next");
+
+                from("seda:next").routeId("processor")
+                        .to("mock:b").id("b")
+                        .to("mock:result").id("result");
+            }
+        };
+    }
+
+}

Reply via email to