This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a025f2a96a0 CAMEL-23692: BacklogTracer captures late-arriving async 
branch events (#23775)
2a025f2a96a0 is described below

commit 2a025f2a96a0cf206541bf620c9f1449ed4466d9
Author: Claus Ibsen <[email protected]>
AuthorDate: Fri Jun 5 09:26:58 2026 +0200

    CAMEL-23692: BacklogTracer captures late-arriving async branch events 
(#23775)
    
    * CAMEL-23692: BacklogTracer captures late-arriving async branch events
    
    When a multicast sends to multiple async endpoints (e.g. Kafka/SEDA),
    the second branch may fire its trace events after a new exchange has
    already claimed the provisional history queue. These late events were
    silently dropped because they matched neither the new exchange nor the
    append-mode conditions.
    
    Add a fallback clause that appends events matching
    lastCompletedBreadcrumbId directly to completeHistoryQueue, ensuring
    all async branches appear in the message history.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    Signed-off-by: Claus Ibsen <[email protected]>
    
    * CAMEL-23692: Simplify test to use single exchange
    
    The previous test sent two exchanges to trigger the race condition but
    this caused the second exchange to clear completeHistoryQueue, making
    assertions unreliable in CI. A single exchange with multicast to two
    seda endpoints (one delayed) is sufficient to verify that both async
    branches appear in the message history.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    Signed-off-by: Claus Ibsen <[email protected]>
    
    ---------
    
    Signed-off-by: Claus Ibsen <[email protected]>
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../apache/camel/impl/debugger/BacklogTracer.java  |   5 ++
 ...cklogTracerMessageHistoryMulticastSedaTest.java | 100 +++++++++++++++++++++
 2 files changed, 105 insertions(+)

diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
index 873e31e628d1..2f3ee0d12726 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
@@ -245,6 +245,11 @@ public class BacklogTracer extends ServiceSupport 
implements org.apache.camel.sp
                     }
                     provisionalHistoryQueue.clear();
                 }
+            } else if (lastCompletedBreadcrumbId != null && 
event.getBreadcrumbId() != null
+                    && 
event.getBreadcrumbId().equals(lastCompletedBreadcrumbId)) {
+                // late-arriving event from a downstream route (e.g. second 
branch of a multicast via Kafka/SEDA)
+                // that arrived after the provisional queue was claimed by a 
new exchange
+                completeHistoryQueue.add(event);
             }
         }
         if (!enabled) {
diff --git 
a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryMulticastSedaTest.java
 
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryMulticastSedaTest.java
new file mode 100644
index 000000000000..df5270df87e1
--- /dev/null
+++ 
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryMulticastSedaTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.BacklogTracerEventMessage;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DisabledOnOs(OS.AIX)
+public class BacklogTracerMessageHistoryMulticastSedaTest extends 
ManagementTestSupport {
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testMulticastToSedaCapturesBothBranches() throws Exception {
+        MBeanServer mbeanServer = getMBeanServer();
+        ObjectName on
+                = new ObjectName(
+                        "org.apache.camel:context=" + 
context.getManagementName() + ",type=tracer,name=BacklogTracer");
+        assertNotNull(on);
+        assertTrue(mbeanServer.isRegistered(on));
+
+        mbeanServer.setAttribute(on, new 
javax.management.Attribute("RemoveOnDump", Boolean.FALSE));
+
+        getMockEndpoint("mock:fast").expectedMessageCount(1);
+        getMockEndpoint("mock:slow").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        // wait for the slow seda branch to complete and verify history 
includes both branches
+        await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            List<BacklogTracerEventMessage> events
+                    = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on, 
"dumpLatestMessageHistory", null, null);
+            assertNotNull(events);
+            assertTrue(events.size() > 0, "Should have history events");
+
+            Set<String> routeIds = events.stream()
+                    .map(BacklogTracerEventMessage::getRouteId)
+                    .collect(Collectors.toSet());
+            assertTrue(routeIds.contains("starter"), "Should contain starter 
route events");
+            assertTrue(routeIds.contains("fast-route"), "Should contain 
fast-route events");
+            assertTrue(routeIds.contains("slow-route"),
+                    "Should contain slow-route events (async branch via 
seda)");
+        });
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                context.setUseBreadcrumb(true);
+                context.setBacklogTracing(true);
+                context.setMessageHistory(true);
+
+                from("direct:start").routeId("starter")
+                        .multicast().parallelProcessing()
+                        .to("seda:fast", "seda:slow")
+                        .end();
+
+                from("seda:fast").routeId("fast-route")
+                        .to("mock:fast");
+
+                from("seda:slow").routeId("slow-route")
+                        .delay(500)
+                        .to("mock:slow");
+            }
+        };
+    }
+
+}

Reply via email to