This is an automated email from the ASF dual-hosted git repository.
ffang pushed a commit to branch camel-4.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.4.x by this push:
new 45b359dac06 Camel 21302 (#16008)
45b359dac06 is described below
commit 45b359dac060b398879b09fe9e0e0f43827b7d73
Author: Freeman(Yue) Fang <[email protected]>
AuthorDate: Tue Oct 22 05:30:50 2024 -0400
Camel 21302 (#16008)
* camel-opentelemetry: add async CXF reproducer AsyncCxfTest
(cherry picked from commit 7d83a62b8e442dc9ac6fd79b153192add940301e)
* [CAMEL-21302]camel-opentelemetry context leak with direct async producer
* Revert "CAMEL-21309: camel-cxf - Force using sync client when using
tracing/opentelemetry as otherwise spans are not working correctly. (#15816)"
This reverts commit 45961c941382cbb43fe27cd861ab7c4e9fc86d8f.
* Revert "CAMEL-21309: camel-cxf - Force using sync client when using
tracing/opentelemetry as otherwise spans are not working correctly."
This reverts commit f5e2b26fdbeb5ffd494b3e4513bacc8ee53ad354.
* [CAMEL-21302]remove unnecessary package import
---------
Co-authored-by: John Poth <[email protected]>
(cherry picked from commit 8788f722922d066cc314c44ed4e60735109e9f07)
(cherry picked from commit 002a6b891eed42d903719cde03730b5bd9e72728)
---
.../camel/component/cxf/jaxrs/CxfRsProducer.java | 23 +------
.../camel/component/cxf/jaxws/CxfProducer.java | 16 +----
.../camel/component/direct/DirectProducer.java | 7 ++
components/camel-opentelemetry/pom.xml | 37 +++++++++++
.../apache/camel/opentelemetry/AsyncCxfTest.java | 75 ++++++++++++++++++++++
.../camel/opentelemetry/CurrentSpanTest.java | 25 ++++++++
.../apache/camel/tracing/ActiveSpanManager.java | 8 +++
7 files changed, 156 insertions(+), 35 deletions(-)
diff --git
a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
index 200f403312b..35029ec7c6f 100644
---
a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
+++
b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
@@ -67,8 +67,6 @@ public class CxfRsProducer extends DefaultAsyncProducer {
private static final Logger LOG =
LoggerFactory.getLogger(CxfRsProducer.class);
- private static final String ACTIVE_SPAN_PROPERTY =
"OpenTracing.activeSpan";
-
private boolean throwException;
// using a cache of factory beans instead of setting the address of a
single cfb
@@ -110,17 +108,6 @@ public class CxfRsProducer extends DefaultAsyncProducer {
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
- // if using camel-tracer then execute this synchronously due to
CXF-9063
- if (exchange.getProperty(ACTIVE_SPAN_PROPERTY) != null) {
- try {
- process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
- }
- callback.done(true);
- return true;
- }
-
try {
Message inMessage = exchange.getIn();
Boolean httpClientAPI =
inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, Boolean.class);
@@ -135,6 +122,7 @@ public class CxfRsProducer extends DefaultAsyncProducer {
}
return false;
} catch (Exception exception) {
+ LOG.error("Error invoking request", exception);
exchange.setException(exception);
callback.done(true);
return true;
@@ -142,8 +130,6 @@ public class CxfRsProducer extends DefaultAsyncProducer {
}
protected void invokeAsyncHttpClient(Exchange exchange, final
AsyncCallback callback) throws Exception {
- LOG.trace("Process exchange: {} (asynchronously)", exchange);
-
Message inMessage = exchange.getIn();
JAXRSClientFactoryBean cfb =
clientFactoryBeanCache.get(CxfRsEndpointUtils
.getEffectiveAddress(exchange, ((CxfRsEndpoint)
getEndpoint()).getAddress()));
@@ -206,8 +192,6 @@ public class CxfRsProducer extends DefaultAsyncProducer {
}
protected void invokeAsyncProxyClient(Exchange exchange, final
AsyncCallback callback) throws Exception {
- LOG.trace("Process exchange: {} (asynchronously)", exchange);
-
Message inMessage = exchange.getIn();
Object[] varValues =
inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, Object[].class);
String methodName = inMessage.getHeader(CxfConstants.OPERATION_NAME,
String.class);
@@ -283,6 +267,7 @@ public class CxfRsProducer extends DefaultAsyncProducer {
}
protected void setupClientMatrix(WebClient client, Exchange exchange)
throws Exception {
+
org.apache.cxf.message.Message cxfMessage
= (org.apache.cxf.message.Message)
exchange.getIn().getHeader(CxfConstants.CAMEL_CXF_MESSAGE);
if (cxfMessage != null) {
@@ -313,8 +298,6 @@ public class CxfRsProducer extends DefaultAsyncProducer {
}
protected void invokeHttpClient(Exchange exchange) throws Exception {
- LOG.trace("Process exchange: {} (synchronously)", exchange);
-
Message inMessage = exchange.getIn();
JAXRSClientFactoryBean cfb =
clientFactoryBeanCache.get(CxfRsEndpointUtils
.getEffectiveAddress(exchange, ((CxfRsEndpoint)
getEndpoint()).getAddress()));
@@ -446,8 +429,6 @@ public class CxfRsProducer extends DefaultAsyncProducer {
}
protected void invokeProxyClient(Exchange exchange) throws Exception {
- LOG.trace("Process exchange: {} (synchronously)", exchange);
-
Message inMessage = exchange.getIn();
Object[] varValues =
inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, Object[].class);
String methodName = inMessage.getHeader(CxfConstants.OPERATION_NAME,
String.class);
diff --git
a/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java
b/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java
index a770d232bed..fb8801cd612 100644
---
a/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java
+++
b/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java
@@ -59,8 +59,6 @@ public class CxfProducer extends DefaultAsyncProducer {
private static final Logger LOG =
LoggerFactory.getLogger(CxfProducer.class);
- private static final String ACTIVE_SPAN_PROPERTY =
"OpenTracing.activeSpan";
-
private Client client;
private CxfEndpoint endpoint;
@@ -101,18 +99,8 @@ public class CxfProducer extends DefaultAsyncProducer {
// so we don't delegate the sync process call to the async process
@Override
public boolean process(Exchange camelExchange, AsyncCallback callback) {
- // if using camel-tracer then execute this synchronously due to
CXF-9063
- if (camelExchange.getProperty(ACTIVE_SPAN_PROPERTY) != null) {
- try {
- process(camelExchange);
- } catch (Exception e) {
- camelExchange.setException(e);
- }
- callback.done(true);
- return true;
- }
+ LOG.trace("Process exchange: {} in an async way.", camelExchange);
- LOG.trace("Process exchange: {} (asynchronously)", camelExchange);
try {
// create CXF exchange
ExchangeImpl cxfExchange = new ExchangeImpl();
@@ -149,7 +137,7 @@ public class CxfProducer extends DefaultAsyncProducer {
*/
@Override
public void process(Exchange camelExchange) throws Exception {
- LOG.trace("Process exchange: {} (synchronously)", camelExchange);
+ LOG.trace("Process exchange: {} in sync way.", camelExchange);
// create CXF exchange
ExchangeImpl cxfExchange = new ExchangeImpl();
diff --git
a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
index 714d00e673e..ad85bc719b0 100644
---
a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
+++
b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
@@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory;
public class DirectProducer extends DefaultAsyncProducer {
private static final Logger LOG =
LoggerFactory.getLogger(DirectProducer.class);
+ private static final String ACTIVE_SPAN_PROPERTY =
"OpenTracing.activeSpan";
+ private static final String CLOSE_CLIENT_SCOPE =
"OpenTracing.closeClientScope";
private volatile DirectConsumer consumer;
private int stateCounter;
@@ -93,6 +95,11 @@ public class DirectProducer extends DefaultAsyncProducer {
callback.done(true);
return true;
} else {
+ //Ensure we can close the CLIENT Scope created by this
DirectProducer
+ //in the same thread
+ if (exchange.getProperty(ACTIVE_SPAN_PROPERTY) != null) {
+ exchange.setProperty(CLOSE_CLIENT_SCOPE, Boolean.TRUE);
+ }
return consumer.getAsyncProcessor().process(exchange,
callback);
}
}
diff --git a/components/camel-opentelemetry/pom.xml
b/components/camel-opentelemetry/pom.xml
index 8f72eb82982..481c51856fd 100644
--- a/components/camel-opentelemetry/pom.xml
+++ b/components/camel-opentelemetry/pom.xml
@@ -139,6 +139,43 @@
<version>${junit-pioneer-version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-cxf-rest</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-cxf-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-transports-http-undertow</artifactId>
+ <version>${cxf-version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>io.undertow</groupId>
+ <artifactId>undertow-servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.undertow</groupId>
+ <artifactId>undertow-servlet-jakarta</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.undertow</groupId>
+ <artifactId>undertow-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-undertow</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/AsyncCxfTest.java
b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/AsyncCxfTest.java
new file mode 100644
index 00000000000..b14b833eb6c
--- /dev/null
+++
b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/AsyncCxfTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cxf.common.CXFTestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+
+class AsyncCxfTest extends CamelOpenTelemetryTestSupport {
+
+ private static int port1 = CXFTestSupport.getPort1();
+
+ private static SpanTestData[] testdata = {}; // not used yet, fix context
leak first
+
+ AsyncCxfTest() {
+ super(testdata);
+ }
+
+ @Test
+ void testRoute() throws InterruptedException {
+ MockEndpoint mock = getMockEndpoint("mock:end");
+ mock.expectedMessageCount(4);
+ int num = 4;
+ for (int i = 0; i < num; i++) {
+ template.requestBody("direct:start", "foo");
+ }
+ mock.assertIsSatisfied(5000);
+ verifyTraceSpanNumbers(num, 9);
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:start").routeId("myRoute")
+ .to("direct:send")
+ .end();
+
+ from("direct:send")
+ .log("message")
+ .to("cxfrs:http://localhost:" + port1
+ + "/rest/helloservice/sayHello?synchronous=false");
+
+ restConfiguration()
+ .port(port1);
+
+ rest("/rest/helloservice")
+ .post("/sayHello").routeId("rest-GET-say-hi")
+ .to("direct:sayHi");
+
+ from("direct:sayHi")
+ .routeId("mock-GET-say-hi")
+ .log("example")
+ .to("mock:end");
+ }
+ };
+ }
+}
diff --git
a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java
b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java
index b2a70500593..5e7e7d2bcda 100644
---
a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java
+++
b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java
@@ -106,6 +106,27 @@ class CurrentSpanTest extends
CamelOpenTelemetryTestSupport {
}
+ @Test
+ void testDirectToDirectToAsync() {
+ SpanTestData[] expectedSpans = {
+ new
SpanTestData().setLabel("asyncmock1:result").setUri("asyncmock1://result").setOperation("asyncmock1")
+ .setKind(SpanKind.CLIENT),
+ new
SpanTestData().setLabel("direct:foo2").setUri("direct://foo2").setOperation("foo2"),
+ new
SpanTestData().setLabel("direct:foo2").setUri("direct://foo2").setOperation("foo2")
+ .setKind(SpanKind.CLIENT),
+ new
SpanTestData().setLabel("direct:foo1").setUri("direct://foo1").setOperation("foo1"),
+ new
SpanTestData().setLabel("direct:foo1").setUri("direct://foo1").setOperation("foo1").setKind(SpanKind.CLIENT)
+ };
+
+ // direct to direct to async pipeline
+ template.sendBody("direct:foo1", "Hello World");
+ awaitInvalidSpanContext();
+
+ List<SpanData> spans = verify(expectedSpans, false);
+ assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId());
+
+ }
+
@Test
void testAsyncToSync() {
// direct client spans (event spans) are not created, so we saw only
two spans in previous tests
@@ -201,6 +222,10 @@ class CurrentSpanTest extends
CamelOpenTelemetryTestSupport {
return new RouteBuilder() {
@Override
public void configure() {
+ // direct to direct to async pipeline
+ from("direct:foo1").to("direct:foo2");
+ from("direct:foo2").to("asyncmock1:result");
+
// sync pipeline
from("direct:bar").to("syncmock:result");
diff --git
a/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java
b/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java
index 0969217289a..498b5518542 100644
---
a/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java
+++
b/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java
@@ -29,6 +29,7 @@ public final class ActiveSpanManager {
public static final String MDC_TRACE_ID = "trace_id";
public static final String MDC_SPAN_ID = "span_id";
private static final String ACTIVE_SPAN_PROPERTY =
"OpenTracing.activeSpan";
+ private static final String CLOSE_CLIENT_SCOPE =
"OpenTracing.closeClientScope";
private static final Logger LOG =
LoggerFactory.getLogger(ActiveSpanManager.class);
private ActiveSpanManager() {
@@ -56,6 +57,13 @@ public final class ActiveSpanManager {
* @param span The span
*/
public static void activate(Exchange exchange, SpanAdapter span) {
+ if (exchange.getProperty(CLOSE_CLIENT_SCOPE, Boolean.FALSE,
Boolean.class)) {
+ //Check if we need to close the CLIENT scope created by
+ //DirectProducer in async mode before we create a new INTERNAL
scope
+ //for the next DirectConsumer
+ endScope(exchange);
+ exchange.removeProperty(CLOSE_CLIENT_SCOPE);
+ }
exchange.setProperty(ACTIVE_SPAN_PROPERTY,
new Holder((Holder)
exchange.getProperty(ACTIVE_SPAN_PROPERTY), span));
if (exchange.getContext().isUseMDCLogging()) {