This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch fix/CAMEL-23692 in repository https://gitbox.apache.org/repos/asf/camel.git
commit fa3f97208e4692c5f08d43bb6221a6c4a586ec74 Author: Claus Ibsen <[email protected]> AuthorDate: Fri Jun 5 07:43:01 2026 +0200 CAMEL-23692: BacklogTracer captures late-arriving async branch events When a multicast sends to multiple async endpoints (e.g. Kafka/SEDA), the second branch may fire its trace events after a new exchange has already claimed the provisional history queue. These late events were silently dropped because they matched neither the new exchange nor the append-mode conditions. Add a fallback clause that appends events matching lastCompletedBreadcrumbId directly to completeHistoryQueue, ensuring all async branches appear in the message history. Co-Authored-By: Claude Opus 4.6 <[email protected]> Signed-off-by: Claus Ibsen <[email protected]> --- .../apache/camel/impl/debugger/BacklogTracer.java | 5 + ...cklogTracerMessageHistoryMulticastSedaTest.java | 103 +++++++++++++++++++++ 2 files changed, 108 insertions(+) diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java index 873e31e628d1..2f3ee0d12726 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java @@ -245,6 +245,11 @@ public class BacklogTracer extends ServiceSupport implements org.apache.camel.sp } provisionalHistoryQueue.clear(); } + } else if (lastCompletedBreadcrumbId != null && event.getBreadcrumbId() != null + && event.getBreadcrumbId().equals(lastCompletedBreadcrumbId)) { + // late-arriving event from a downstream route (e.g. second branch of a multicast via Kafka/SEDA) + // that arrived after the provisional queue was claimed by a new exchange + completeHistoryQueue.add(event); } } if (!enabled) { diff --git a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryMulticastSedaTest.java b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryMulticastSedaTest.java new file mode 100644 index 000000000000..5759a2c2673a --- /dev/null +++ b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryMulticastSedaTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.spi.BacklogTracerEventMessage; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@DisabledOnOs(OS.AIX) +public class BacklogTracerMessageHistoryMulticastSedaTest extends ManagementTestSupport { + + @SuppressWarnings("unchecked") + @Test + public void testLateArrivingAsyncBranchCaptured() throws Exception { + MBeanServer mbeanServer = getMBeanServer(); + ObjectName on + = new ObjectName( + "org.apache.camel:context=" + context.getManagementName() + ",type=tracer,name=BacklogTracer"); + assertNotNull(on); + assertTrue(mbeanServer.isRegistered(on)); + + mbeanServer.setAttribute(on, new javax.management.Attribute("RemoveOnDump", Boolean.FALSE)); + + getMockEndpoint("mock:fast").expectedMessageCount(2); + getMockEndpoint("mock:slow").expectedMessageCount(2); + + // send two exchanges in quick succession so the second claims the provisional queue + // before the slow branch of the first exchange finishes + template.sendBody("direct:start", "First"); + template.sendBody("direct:start", "Second"); + + assertMockEndpointsSatisfied(); + + // wait for the slow seda branch to complete and verify history includes both branches + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + List<BacklogTracerEventMessage> events + = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on, "dumpLatestMessageHistory", null, null); + assertNotNull(events); + assertTrue(events.size() > 0, "Should have history events"); + + Set<String> routeIds = events.stream() + .map(BacklogTracerEventMessage::getRouteId) + .collect(Collectors.toSet()); + assertTrue(routeIds.contains("main"), "Should contain main route events"); + assertTrue(routeIds.contains("fast-route"), "Should contain fast-route events"); + assertTrue(routeIds.contains("slow-route"), + "Should contain slow-route events (late-arriving async branch)"); + }); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + context.setUseBreadcrumb(true); + context.setBacklogTracing(true); + context.setMessageHistory(true); + + from("direct:start").routeId("main") + .multicast().parallelProcessing() + .to("seda:fast", "seda:slow") + .end(); + + from("seda:fast").routeId("fast-route") + .to("mock:fast"); + + from("seda:slow").routeId("slow-route") + .delay(500) + .to("mock:slow"); + } + }; + } + +}
