This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 0f2c5b46be30 CAMEL-23684: BacklogTracer - correlate message history by
breadcrumb ID (#23765)
0f2c5b46be30 is described below
commit 0f2c5b46be301de3ed960304eebb8a2580d962c9
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Jun 4 15:10:23 2026 +0200
CAMEL-23684: BacklogTracer - correlate message history by breadcrumb ID
(#23765)
* CAMEL-23684: BacklogTracer - correlate message history by breadcrumb ID
Capture the breadcrumb ID (CamelBreadcrumbId header) in BacklogTracer
event messages and use it to correlate message history across broker
boundaries. When exchanges pass through Kafka, SEDA, JMS, etc., each
consumer creates a new independent exchange - the breadcrumb ID links
them together so the history tab shows the full end-to-end flow.
Falls back to exchange ID / correlation ID matching when breadcrumb
is not set (e.g. when allowUseOriginalMessage is disabled).
Co-Authored-By: Claude Opus 4.6 <[email protected]>
Signed-off-by: Claus Ibsen <[email protected]>
* CAMEL-23684: Add unit test for breadcrumb history correlation across SEDA
Test verifies that message history captures events from multiple routes
connected via SEDA (which creates independent exchanges), correlated by
breadcrumb ID. Also fixes the completion logic: in breadcrumb mode,
snapshot on every route completion without clearing the provisional
queue, so downstream routes accumulate into the same history.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
Signed-off-by: Claus Ibsen <[email protected]>
---------
Signed-off-by: Claus Ibsen <[email protected]>
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../camel/spi/BacklogTracerEventMessage.java | 8 ++
.../apache/camel/impl/debugger/BacklogTracer.java | 50 ++++++++--
.../impl/debugger/DefaultBacklogDebugger.java | 10 +-
.../debugger/DefaultBacklogTracerEventMessage.java | 12 ++-
.../camel/impl/engine/CamelInternalProcessor.java | 18 ++--
.../BacklogTracerMessageHistoryBreadcrumbTest.java | 111 +++++++++++++++++++++
6 files changed, 190 insertions(+), 19 deletions(-)
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
index 7956e8587e53..a87c08ec0b6a 100644
---
a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
+++
b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
@@ -129,6 +129,14 @@ public interface BacklogTracerEventMessage extends
BacklogEventMessage {
@Nullable
String getCorrelationExchangeId();
+ /**
+ * The breadcrumb id that links exchanges across broker boundaries (Kafka,
SEDA, JMS, etc.)
+ *
+ * @since 4.21
+ */
+ @Nullable
+ String getBreadcrumbId();
+
/**
* The name of the thread that is processing the message, when this event
was captured.
*/
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
index 2bc0a9543535..873e31e628d1 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
@@ -65,6 +65,7 @@ public class BacklogTracer extends ServiceSupport implements
org.apache.camel.sp
// use tracer to capture additional information for capturing latest
completed exchange message-history
private final Queue<BacklogTracerEventMessage> provisionalHistoryQueue =
new LinkedBlockingQueue<>(MAX_BACKLOG_SIZE);
private final Queue<BacklogTracerEventMessage> completeHistoryQueue = new
LinkedBlockingQueue<>(MAX_BACKLOG_SIZE + 1);
+ private volatile String lastCompletedBreadcrumbId;
private boolean removeOnDump = true;
private int bodyMaxChars = 32 * 1024;
private boolean bodyIncludeStreams;
@@ -164,6 +165,7 @@ public class BacklogTracer extends ServiceSupport
implements org.apache.camel.sp
String toNodeLabel = StringHelper.limitLength(node.getLabel(), 50);
String exchangeId = exchange.getExchangeId();
String correlationExchangeId =
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+ String breadcrumbId =
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
int level = node.getLevel();
String fromRouteId = exchange.getFromRouteId();
String source = LoggerHelper.getLineNumberLoggerName(node);
@@ -172,7 +174,7 @@ public class BacklogTracer extends ServiceSupport
implements org.apache.camel.sp
DefaultBacklogTracerEventMessage event = new
DefaultBacklogTracerEventMessage(
camelContext, first, last, incrementTraceCounter(), timestamp,
source, fromRouteId, fromRouteId, toNode,
toNodeParentId, null, null, toNodeShortName, toNodeLabel,
level,
- exchangeId, correlationExchangeId, false, false, data);
+ exchangeId, correlationExchangeId, breadcrumbId, false, false,
data);
if ((first || last) && fromRouteId != null) {
Route route = camelContext.getRoute(fromRouteId);
if (route != null && route.getConsumer() != null) {
@@ -195,21 +197,51 @@ public class BacklogTracer extends ServiceSupport
implements org.apache.camel.sp
// handle capturing events for last full completed exchange (aka
replay)
if (camelContext.isMessageHistory()) {
- String tid = null;
var head = provisionalHistoryQueue.peek();
+ String bid = null;
+ String tid = null;
if (head != null) {
+ bid = head.getBreadcrumbId();
tid = head.getExchangeId();
}
- if (tid == null || tid.equals(event.getExchangeId()) ||
tid.equals(event.getCorrelationExchangeId())) {
+ // correlate by breadcrumb ID when available (links exchanges
across broker boundaries)
+ // fallback to exchange ID / correlation ID matching when
breadcrumb is not set
+ boolean match;
+ if (bid != null && event.getBreadcrumbId() != null) {
+ match = bid.equals(event.getBreadcrumbId());
+ } else {
+ match = tid == null || tid.equals(event.getExchangeId()) ||
tid.equals(event.getCorrelationExchangeId());
+ }
+ // check if this event continues a previously completed breadcrumb
flow
+ // (e.g. a downstream route connected via Kafka/SEDA that starts
after the originating route finished)
+ boolean appendMode = false;
+ if (head == null && event.getBreadcrumbId() != null
+ &&
event.getBreadcrumbId().equals(lastCompletedBreadcrumbId)) {
+ appendMode = true;
+ } else if (head != null && event.getBreadcrumbId() != null
+ &&
event.getBreadcrumbId().equals(lastCompletedBreadcrumbId)
+ && head.getBreadcrumbId() != null
+ &&
head.getBreadcrumbId().equals(lastCompletedBreadcrumbId)) {
+ appendMode = true;
+ }
+ if (match || appendMode) {
boolean added = provisionalHistoryQueue.offer(event);
boolean original = head != null && event.getRouteId() != null
&& event.getRouteId().equals(head.getRouteId());
if (event.isLast() && original) {
- // only trigger completion when it's the original last
- completeHistoryQueue.clear();
- completeHistoryQueue.addAll(provisionalHistoryQueue);
- // in case we hit the limit then ensure the last is always
added to the complete history
- if (!added) {
- completeHistoryQueue.add(event);
+ if (appendMode) {
+ // downstream route finished: merge into existing
complete history
+ completeHistoryQueue.addAll(provisionalHistoryQueue);
+ if (!added) {
+ completeHistoryQueue.add(event);
+ }
+ } else {
+ // originating route finished: replace complete history
+ completeHistoryQueue.clear();
+ completeHistoryQueue.addAll(provisionalHistoryQueue);
+ if (!added) {
+ completeHistoryQueue.add(event);
+ }
+ lastCompletedBreadcrumbId = event.getBreadcrumbId();
}
provisionalHistoryQueue.clear();
}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
index 7adf38f6b7c8..09d9aad0e903 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
@@ -896,6 +896,7 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
message.getToNodeParentWhenLabel(),
message.getToNodeShortName(), message.getToNodeLabel(),
message.getToNodeLevel(), message.getExchangeId(),
message.getCorrelationExchangeId(),
+ message.getBreadcrumbId(),
false, false,
dumpAsJSonObject(suspendedExchange.getExchange())));
}
@@ -939,6 +940,7 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
String routeId = CamelContextHelper.getRouteId(definition);
String exchangeId = exchange.getExchangeId();
String correlationExchangeId =
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+ String breadcrumbId =
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
int level = definition.getLevel();
long uid = debugCounter.incrementAndGet();
String source = LoggerHelper.getLineNumberLoggerName(definition);
@@ -948,7 +950,7 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
= new DefaultBacklogTracerEventMessage(
camelContext,
first, false, uid, timestamp, source, fromRouteId,
routeId, toNode, toNodeParentId, null, null,
- toNodeShortName, toNodeLabel, level, exchangeId,
correlationExchangeId,
+ toNodeShortName, toNodeLabel, level, exchangeId,
correlationExchangeId, breadcrumbId,
false, false, data);
suspendedBreakpointMessages.put(nodeId, msg);
@@ -1030,6 +1032,7 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
String routeId = CamelContextHelper.getRouteId(definition);
String exchangeId = exchange.getExchangeId();
String correlationExchangeId =
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+ String breadcrumbId =
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
int level = definition.getLevel();
long uid = debugCounter.incrementAndGet();
String source = LoggerHelper.getLineNumberLoggerName(definition);
@@ -1039,7 +1042,7 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
camelContext,
false, false, uid, timestamp, source, fromRouteId,
routeId, toNode, toNodeParentId, null, null,
toNodeShortName, toNodeLabel, level,
- exchangeId, correlationExchangeId,
+ exchangeId, correlationExchangeId, breadcrumbId,
false, false, data);
suspendedBreakpointMessages.put(toNode, msg);
@@ -1140,6 +1143,7 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
String routeId = route != null ? route.getRouteId() : toNode;
String exchangeId = exchange.getExchangeId();
String correlationExchangeId =
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+ String breadcrumbId =
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
int level = definition.getLevel();
long uid = debugCounter.incrementAndGet();
String source = LoggerHelper.getLineNumberLoggerName(route != null
? route : definition);
@@ -1148,7 +1152,7 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
= new DefaultBacklogTracerEventMessage(
camelContext,
false, true, uid, timestamp, source, fromRouteId,
routeId, toNode, toNodeParentId,
- null, null, null, null, level, exchangeId,
correlationExchangeId,
+ null, null, null, null, level, exchangeId,
correlationExchangeId, breadcrumbId,
false, false, data);
// we want to capture if there was an exception
if (cause != null) {
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
index 7e8b0edabc38..dc0dd0b8755f 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
@@ -54,6 +54,7 @@ public final class DefaultBacklogTracerEventMessage
implements BacklogTracerEven
private final int toNodeLevel;
private final String exchangeId;
private final String correlationExchangeId;
+ private final String breadcrumbId;
private final String threadName;
private String endpointUri;
private boolean remoteEndpoint;
@@ -77,7 +78,7 @@ public final class DefaultBacklogTracerEventMessage
implements BacklogTracerEven
String toNodeParentId,
String toNodeParentWhenId, String
toNodeParentWhenLabel,
String toNodeShortName, String
toNodeLabel, int toNodeLevel, String exchangeId,
- String correlationExchangeId,
+ String correlationExchangeId,
String breadcrumbId,
boolean rest, boolean template,
JsonObject data) {
this.camelContext = camelContext;
this.watch = new StopWatch();
@@ -97,6 +98,7 @@ public final class DefaultBacklogTracerEventMessage
implements BacklogTracerEven
this.toNodeLevel = toNodeLevel;
this.exchangeId = exchangeId;
this.correlationExchangeId = correlationExchangeId;
+ this.breadcrumbId = breadcrumbId;
this.rest = rest;
this.template = template;
this.threadName = Thread.currentThread().getName();
@@ -199,6 +201,11 @@ public final class DefaultBacklogTracerEventMessage
implements BacklogTracerEven
return correlationExchangeId;
}
+ @Override
+ public String getBreadcrumbId() {
+ return breadcrumbId;
+ }
+
@Override
public String getProcessingThreadName() {
return threadName;
@@ -587,6 +594,9 @@ public final class DefaultBacklogTracerEventMessage
implements BacklogTracerEven
if (correlationExchangeId != null) {
jo.put("correlationExchangeId", correlationExchangeId);
}
+ if (breadcrumbId != null) {
+ jo.put("breadcrumbId", breadcrumbId);
+ }
if (timestamp > 0) {
jo.put("timestamp", timestamp);
}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 66c56142faf0..530b8534c4d6 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -649,6 +649,7 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
String source = LoggerHelper.getLineNumberLoggerName(input);
String exchangeId = exchange.getExchangeId();
String correlationExchangeId =
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+ String breadcrumbId =
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
String routeId = routeDefinition.getRouteId();
String fromRouteId = exchange.getFromRouteId();
int level = 1;
@@ -666,7 +667,7 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
input.getId(),
null, null, null,
input.getShortName(), input.getLabel(),
- level, exchangeId, correlationExchangeId, rest,
template, data);
+ level, exchangeId, correlationExchangeId,
breadcrumbId, rest, template, data);
if (exchange.getFromEndpoint() instanceof
EndpointServiceLocation esl) {
first.setEndpointServiceUrl(esl.getServiceUrl());
first.setEndpointServiceProtocol(esl.getServiceProtocol());
@@ -687,6 +688,7 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
String source = LoggerHelper.getLineNumberLoggerName(input);
String exchangeId = exchange.getExchangeId();
String correlationExchangeId =
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+ String breadcrumbId =
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
String routeId = routeDefinition.getRouteId();
String fromRouteId = exchange.getFromRouteId();
int level = 1;
@@ -704,7 +706,7 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
input.getId(),
null, null, null,
input.getShortName(), input.getLabel(),
- level, exchangeId, correlationExchangeId, rest,
template, data);
+ level, exchangeId, correlationExchangeId,
breadcrumbId, rest, template, data);
if (exchange.getFromEndpoint() instanceof
EndpointServiceLocation esl) {
first.setEndpointServiceUrl(esl.getServiceUrl());
first.setEndpointServiceProtocol(esl.getServiceProtocol());
@@ -775,6 +777,7 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
public DefaultBacklogTracerEventMessage before(Exchange exchange)
throws Exception {
String exchangeId = exchange.getExchangeId();
String correlationExchangeId =
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+ String breadcrumbId =
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
int level = processorDefinition.getLevel();
String routeId = ExchangeHelper.getAtRouteId(exchange);
String fromRouteId = exchange.getFromRouteId();
@@ -794,7 +797,7 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
processorDefinition.getId(),
null, null, null,
processorDefinition.getShortName(),
processorDefinition.getLabel(),
- level + 1, exchangeId, correlationExchangeId, false,
false, data);
+ level + 1, exchangeId, correlationExchangeId,
breadcrumbId, false, false, data);
backlogTracer.traceEvent(event);
return event;
}
@@ -806,6 +809,7 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
String source =
LoggerHelper.getLineNumberLoggerName(processorDefinition);
String exchangeId = exchange.getExchangeId();
String correlationExchangeId =
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+ String breadcrumbId =
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
String routeId = ExchangeHelper.getAtRouteId(exchange);
String fromRouteId = exchange.getFromRouteId();
int level = 1;
@@ -823,7 +827,7 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
processorDefinition.getId(),
null, null, null,
processorDefinition.getShortName(),
processorDefinition.getLabel(),
- level, exchangeId, correlationExchangeId, false,
false, data);
+ level, exchangeId, correlationExchangeId,
breadcrumbId, false, false, data);
if (exchange.getFromEndpoint() instanceof
EndpointServiceLocation esl) {
first.setEndpointServiceUrl(esl.getServiceUrl());
first.setEndpointServiceProtocol(esl.getServiceProtocol());
@@ -931,6 +935,7 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
String toNodeLabel =
StringHelper.limitLength(processorDefinition.getLabel(), 50);
String exchangeId = exchange.getExchangeId();
String correlationExchangeId =
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+ String breadcrumbId =
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
int level = processorDefinition.getLevel();
boolean includeExchangeProperties =
backlogTracer.isIncludeExchangeProperties();
@@ -951,7 +956,7 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
toNodeParentId,
toNodeParentWhenId, toNodeParentWhenLabel,
toNodeShortName, toNodeLabel, level,
- exchangeId, correlationExchangeId, rest, template,
data);
+ exchangeId, correlationExchangeId, breadcrumbId, rest,
template, data);
backlogTracer.traceEvent(event);
return event;
}
@@ -969,6 +974,7 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
String fromRouteId = exchange.getFromRouteId();
String exchangeId = exchange.getExchangeId();
String correlationExchangeId =
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+ String breadcrumbId =
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
boolean includeExchangeProperties =
backlogTracer.isIncludeExchangeProperties();
boolean includeExchangeVariables =
backlogTracer.isIncludeExchangeVariables();
long created = exchange.getClock().getCreated();
@@ -985,7 +991,7 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
false, true,
backlogTracer.incrementTraceCounter(), created, source, fromRouteId, routeId,
toNode,
null, null,
null, toNodeShortName, toNodeLabel,
- level, exchangeId, correlationExchangeId, rest,
template, data);
+ level, exchangeId, correlationExchangeId,
breadcrumbId, rest, template, data);
backlogTracer.traceEvent(pseudoLast);
doneProcessing(exchange, pseudoLast);
doneProcessing(exchange, pseudoFirst);
diff --git
a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryBreadcrumbTest.java
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryBreadcrumbTest.java
new file mode 100644
index 000000000000..4235eeb71939
--- /dev/null
+++
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryBreadcrumbTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.BacklogTracerEventMessage;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DisabledOnOs(OS.AIX)
+public class BacklogTracerMessageHistoryBreadcrumbTest extends
ManagementTestSupport {
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testMessageHistoryCorrelatesBySedaBreadcrumb() throws
Exception {
+ MBeanServer mbeanServer = getMBeanServer();
+ ObjectName on
+ = new ObjectName("org.apache.camel:context=" +
context.getManagementName() + ",type=tracer,name=BacklogTracer");
+ assertNotNull(on);
+ assertTrue(mbeanServer.isRegistered(on));
+
+ // disable remove-on-dump so polling in await doesn't drain the queue
+ mbeanServer.setAttribute(on, new
javax.management.Attribute("RemoveOnDump", Boolean.FALSE));
+
+ getMockEndpoint("mock:a").expectedMessageCount(1);
+ getMockEndpoint("mock:b").expectedMessageCount(1);
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ // wait for the history to be completed (the seda consumer runs async)
+ await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+ List<BacklogTracerEventMessage> events
+ = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on,
"dumpLatestMessageHistory", null, null);
+ assertNotNull(events);
+
+ // verify events span multiple routes (connected via seda which
creates independent exchanges)
+ Set<String> routeIds = events.stream()
+ .map(BacklogTracerEventMessage::getRouteId)
+ .collect(Collectors.toSet());
+ assertTrue(routeIds.contains("starter"), "Should contain starter
route events");
+ assertTrue(routeIds.contains("processor"), "Should contain
processor route events");
+
+ // all events should share the same breadcrumb ID
+ String breadcrumb = events.get(0).getBreadcrumbId();
+ assertNotNull(breadcrumb, "Breadcrumb ID should be set");
+ for (BacklogTracerEventMessage event : events) {
+ assertEquals(breadcrumb, event.getBreadcrumbId(),
+ "All events should share the same breadcrumb ID");
+ }
+
+ // verify events span multiple exchange IDs (seda creates new
exchanges)
+ Set<String> exchangeIds = events.stream()
+ .map(BacklogTracerEventMessage::getExchangeId)
+ .collect(Collectors.toSet());
+ assertTrue(exchangeIds.size() > 1,
+ "Should have events from multiple exchange IDs (seda
creates new exchanges)");
+ });
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ context.setUseBreadcrumb(true);
+ context.setBacklogTracing(true);
+ context.setMessageHistory(true);
+
+ from("direct:start").routeId("starter")
+ .to("mock:a").id("a")
+ .to("seda:next");
+
+ from("seda:next").routeId("processor")
+ .to("mock:b").id("b")
+ .to("mock:result").id("result");
+ }
+ };
+ }
+
+}