This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 2a025f2a96a0 CAMEL-23692: BacklogTracer captures late-arriving async
branch events (#23775)
2a025f2a96a0 is described below
commit 2a025f2a96a0cf206541bf620c9f1449ed4466d9
Author: Claus Ibsen <[email protected]>
AuthorDate: Fri Jun 5 09:26:58 2026 +0200
CAMEL-23692: BacklogTracer captures late-arriving async branch events
(#23775)
* CAMEL-23692: BacklogTracer captures late-arriving async branch events
When a multicast sends to multiple async endpoints (e.g. Kafka/SEDA),
the second branch may fire its trace events after a new exchange has
already claimed the provisional history queue. These late events were
silently dropped because they matched neither the new exchange nor the
append-mode conditions.
Add a fallback clause that appends events matching
lastCompletedBreadcrumbId directly to completeHistoryQueue, ensuring
all async branches appear in the message history.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
Signed-off-by: Claus Ibsen <[email protected]>
* CAMEL-23692: Simplify test to use single exchange
The previous test sent two exchanges to trigger the race condition but
this caused the second exchange to clear completeHistoryQueue, making
assertions unreliable in CI. A single exchange with multicast to two
seda endpoints (one delayed) is sufficient to verify that both async
branches appear in the message history.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
Signed-off-by: Claus Ibsen <[email protected]>
---------
Signed-off-by: Claus Ibsen <[email protected]>
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../apache/camel/impl/debugger/BacklogTracer.java | 5 ++
...cklogTracerMessageHistoryMulticastSedaTest.java | 100 +++++++++++++++++++++
2 files changed, 105 insertions(+)
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
index 873e31e628d1..2f3ee0d12726 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
@@ -245,6 +245,11 @@ public class BacklogTracer extends ServiceSupport
implements org.apache.camel.sp
}
provisionalHistoryQueue.clear();
}
+ } else if (lastCompletedBreadcrumbId != null &&
event.getBreadcrumbId() != null
+ &&
event.getBreadcrumbId().equals(lastCompletedBreadcrumbId)) {
+ // late-arriving event from a downstream route (e.g. second
branch of a multicast via Kafka/SEDA)
+ // that arrived after the provisional queue was claimed by a
new exchange
+ completeHistoryQueue.add(event);
}
}
if (!enabled) {
diff --git
a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryMulticastSedaTest.java
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryMulticastSedaTest.java
new file mode 100644
index 000000000000..df5270df87e1
--- /dev/null
+++
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryMulticastSedaTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.BacklogTracerEventMessage;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DisabledOnOs(OS.AIX)
+public class BacklogTracerMessageHistoryMulticastSedaTest extends
ManagementTestSupport {
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testMulticastToSedaCapturesBothBranches() throws Exception {
+ MBeanServer mbeanServer = getMBeanServer();
+ ObjectName on
+ = new ObjectName(
+ "org.apache.camel:context=" +
context.getManagementName() + ",type=tracer,name=BacklogTracer");
+ assertNotNull(on);
+ assertTrue(mbeanServer.isRegistered(on));
+
+ mbeanServer.setAttribute(on, new
javax.management.Attribute("RemoveOnDump", Boolean.FALSE));
+
+ getMockEndpoint("mock:fast").expectedMessageCount(1);
+ getMockEndpoint("mock:slow").expectedMessageCount(1);
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ // wait for the slow seda branch to complete and verify history
includes both branches
+ await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ List<BacklogTracerEventMessage> events
+ = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on,
"dumpLatestMessageHistory", null, null);
+ assertNotNull(events);
+ assertTrue(events.size() > 0, "Should have history events");
+
+ Set<String> routeIds = events.stream()
+ .map(BacklogTracerEventMessage::getRouteId)
+ .collect(Collectors.toSet());
+ assertTrue(routeIds.contains("starter"), "Should contain starter
route events");
+ assertTrue(routeIds.contains("fast-route"), "Should contain
fast-route events");
+ assertTrue(routeIds.contains("slow-route"),
+ "Should contain slow-route events (async branch via
seda)");
+ });
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ context.setUseBreadcrumb(true);
+ context.setBacklogTracing(true);
+ context.setMessageHistory(true);
+
+ from("direct:start").routeId("starter")
+ .multicast().parallelProcessing()
+ .to("seda:fast", "seda:slow")
+ .end();
+
+ from("seda:fast").routeId("fast-route")
+ .to("mock:fast");
+
+ from("seda:slow").routeId("slow-route")
+ .delay(500)
+ .to("mock:slow");
+ }
+ };
+ }
+
+}