This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch camel-2.21.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.21.x by this push: new 9114dcc CAMEL-12451 Make sure the camel-cxf component release UoW in oneway invocation 9114dcc is described below commit 9114dccdabdfd070f93413bf7c92fc281c37252f Author: Willem Jiang <jiangni...@huawei.com> AuthorDate: Fri Apr 20 08:52:45 2018 +0800 CAMEL-12451 Make sure the camel-cxf component release UoW in oneway invocation --- .../apache/camel/component/cxf/CxfConsumer.java | 35 +++++----------- .../interceptors/UnitOfWorkCloserInterceptor.java | 47 ++++++++++++++++++++++ .../camel/component/cxf/jaxrs/CxfRsConsumer.java | 37 +++-------------- .../apache/camel/component/cxf/util/CxfUtils.java | 14 ++++++- .../camel/component/cxf/CxfOneWayRouteTest.java | 19 ++++++++- .../camel/component/cxf/CxfOneWayRouteBeans.xml | 6 ++- 6 files changed, 98 insertions(+), 60 deletions(-) diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java index 799f5d5..24003cb 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java @@ -21,6 +21,9 @@ import java.util.HashMap; import java.util.Map; import javax.xml.ws.WebFault; + +import org.apache.camel.component.cxf.interceptors.UnitOfWorkCloserInterceptor; +import org.apache.camel.component.cxf.util.CxfUtils; import org.w3c.dom.Element; import org.apache.camel.AsyncCallback; @@ -39,7 +42,6 @@ import org.apache.cxf.interceptor.Fault; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.FaultMode; import org.apache.cxf.message.Message; -import org.apache.cxf.phase.AbstractPhaseInterceptor; import org.apache.cxf.phase.Phase; import org.apache.cxf.service.invoker.Invoker; import org.apache.cxf.service.model.BindingOperationInfo; @@ -83,38 +85,18 @@ public class CxfConsumer extends DefaultConsumer { final MessageObserver originalOutFaultObserver = server.getEndpoint().getOutFaultObserver(); server.getEndpoint().setOutFaultObserver(message -> { - Exchange cxfExchange = null; - if ((cxfExchange = message.getExchange()) != null) { - org.apache.camel.Exchange exchange = cxfExchange.get(org.apache.camel.Exchange.class); - if (exchange != null) { - doneUoW(exchange); - } - } originalOutFaultObserver.onMessage(message); + CxfUtils.closeCamelUnitOfWork(message); }); + // setup the UnitOfWorkCloserInterceptor for OneWayMessageProcessor + server.getEndpoint().getInInterceptors().add(new UnitOfWorkCloserInterceptor(Phase.POST_INVOKE, true)); + // close the UnitOfWork normally server.getEndpoint().getOutInterceptors().add(new UnitOfWorkCloserInterceptor()); return server; } - //closes UnitOfWork in good case - private class UnitOfWorkCloserInterceptor extends AbstractPhaseInterceptor<Message> { - public UnitOfWorkCloserInterceptor() { - super(Phase.POST_LOGICAL_ENDING); - } - - @Override - public void handleMessage(Message message) throws Fault { - Exchange cxfExchange = null; - if ((cxfExchange = message.getExchange()) != null) { - org.apache.camel.Exchange exchange = cxfExchange.get(org.apache.camel.Exchange.class); - if (exchange != null) { - doneUoW(exchange); - } - } - } - } public Server getServer() { return server; @@ -162,8 +144,9 @@ public class CxfConsumer extends DefaultConsumer { // we assume it should support AsyncInvocation out of box return true; } - + private class CxfConsumerInvoker implements Invoker { + private final CxfEndpoint endpoint; CxfConsumerInvoker(CxfEndpoint endpoint) { diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/UnitOfWorkCloserInterceptor.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/UnitOfWorkCloserInterceptor.java new file mode 100644 index 0000000..8825403 --- /dev/null +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/UnitOfWorkCloserInterceptor.java @@ -0,0 +1,47 @@ +package org.apache.camel.component.cxf.interceptors; + +import org.apache.camel.component.cxf.util.CxfUtils; +import org.apache.cxf.interceptor.Fault; +import org.apache.cxf.interceptor.OutgoingChainInterceptor; +import org.apache.cxf.message.Exchange; +import org.apache.cxf.message.Message; +import org.apache.cxf.phase.AbstractPhaseInterceptor; +import org.apache.cxf.phase.Phase; +import org.apache.cxf.service.model.BindingOperationInfo; + +//closes UnitOfWork in good case +public class UnitOfWorkCloserInterceptor extends AbstractPhaseInterceptor<Message> { + boolean handleOneWayMessage; + + public UnitOfWorkCloserInterceptor(String phase, boolean handleOneWayMessage) { + super(phase); + // Just make sure this interceptor is add after the OutgoingChainInterceptor + if (phase.equals(Phase.POST_INVOKE)) { + addAfter(OutgoingChainInterceptor.class.getName()); + } + this.handleOneWayMessage = handleOneWayMessage; + } + public UnitOfWorkCloserInterceptor() { + super(Phase.POST_LOGICAL_ENDING); + } + + @Override + public void handleMessage(Message message) throws Fault { + if (handleOneWayMessage) { + if (isOneWay(message)) { + CxfUtils.closeCamelUnitOfWork(message); + } + } else { // Just do the normal process + CxfUtils.closeCamelUnitOfWork(message); + } + } + + private boolean isOneWay(Message message) { + Exchange ex = message.getExchange(); + BindingOperationInfo binding = ex.getBindingOperationInfo(); + if (null != binding && null != binding.getOperationInfo() && binding.getOperationInfo().isOneWay()) { + return true; + } + return false; + } +} diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java index e97caa9..4c68e49 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java @@ -16,16 +16,13 @@ */ package org.apache.camel.component.cxf.jaxrs; -import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.component.cxf.interceptors.UnitOfWorkCloserInterceptor; +import org.apache.camel.component.cxf.util.CxfUtils; import org.apache.camel.impl.DefaultConsumer; import org.apache.cxf.Bus; import org.apache.cxf.endpoint.Server; -import org.apache.cxf.interceptor.Fault; -import org.apache.cxf.interceptor.OutFaultChainInitiatorObserver; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; -import org.apache.cxf.message.Message; -import org.apache.cxf.phase.AbstractPhaseInterceptor; import org.apache.cxf.phase.Phase; import org.apache.cxf.transport.MessageObserver; @@ -56,7 +53,9 @@ public class CxfRsConsumer extends DefaultConsumer { } svrBean.setInvoker(cxfRsInvoker); - + // setup the UnitOfWorkCloserInterceptor for OneWayMessageProcessor + svrBean.getInInterceptors().add(new UnitOfWorkCloserInterceptor(Phase.POST_INVOKE, true)); + // close the UnitOfWork normally svrBean.getOutInterceptors().add(new UnitOfWorkCloserInterceptor()); @@ -65,37 +64,13 @@ public class CxfRsConsumer extends DefaultConsumer { final MessageObserver originalOutFaultObserver = server.getEndpoint().getOutFaultObserver(); //proxy OutFaultObserver so we can close org.apache.camel.spi.UnitOfWork in case of error server.getEndpoint().setOutFaultObserver(message -> { - org.apache.cxf.message.Exchange cxfExchange = null; - if ((cxfExchange = message.getExchange()) != null) { - org.apache.camel.Exchange exchange = cxfExchange.get(org.apache.camel.Exchange.class); - if (exchange != null) { - doneUoW(exchange); - } - } + CxfUtils.closeCamelUnitOfWork(message); originalOutFaultObserver.onMessage(message); }); return server; } - //closes UnitOfWork in good case - private class UnitOfWorkCloserInterceptor extends AbstractPhaseInterceptor<Message> { - public UnitOfWorkCloserInterceptor() { - super(Phase.POST_LOGICAL_ENDING); - } - - @Override - public void handleMessage(Message message) throws Fault { - org.apache.cxf.message.Exchange cxfExchange = null; - if ((cxfExchange = message.getExchange()) != null) { - org.apache.camel.Exchange exchange = cxfExchange.get(org.apache.camel.Exchange.class); - if (exchange != null) { - doneUoW(exchange); - } - } - } - } - @Override protected void doStart() throws Exception { super.doStart(); diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/util/CxfUtils.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/util/CxfUtils.java index 45894fb..23b185c 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/util/CxfUtils.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/util/CxfUtils.java @@ -24,6 +24,9 @@ import java.util.Map.Entry; import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamWriter; +import org.apache.camel.util.UnitOfWorkHelper; +import org.apache.cxf.message.Exchange; +import org.apache.cxf.message.Message; import org.w3c.dom.Element; import org.w3c.dom.NamedNodeMap; import org.w3c.dom.Node; @@ -170,6 +173,15 @@ public final class CxfUtils { } } } - + + public static void closeCamelUnitOfWork(Message message) { + Exchange cxfExchange = null; + if ((cxfExchange = message.getExchange()) != null) { + org.apache.camel.Exchange exchange = cxfExchange.get(org.apache.camel.Exchange.class); + if (exchange != null) { + UnitOfWorkHelper.doneUow(exchange.getUnitOfWork(), exchange); + } + } + } } diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfOneWayRouteTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfOneWayRouteTest.java index 230c58f..9c16890 100644 --- a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfOneWayRouteTest.java +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfOneWayRouteTest.java @@ -26,6 +26,7 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spi.Synchronization; import org.apache.camel.test.spring.CamelSpringTestSupport; import org.apache.hello_world_soap_http.Greeter; import org.junit.Before; @@ -46,11 +47,13 @@ public class CxfOneWayRouteTest extends CamelSpringTestSupport { private static Exception bindingException; private static boolean bindingDone; - + private static boolean onCompeletedCalled; + @Before public void setup() { bindingException = null; bindingDone = false; + onCompeletedCalled = false; } @Override @@ -82,6 +85,7 @@ public class CxfOneWayRouteTest extends CamelSpringTestSupport { } assertMockEndpointsSatisfied(); + assertTrue("UnitOfWork done should be called", onCompeletedCalled); assertNull("exception occured: " + bindingException, bindingException); } @@ -97,10 +101,23 @@ public class CxfOneWayRouteTest extends CamelSpringTestSupport { bos.write(MAGIC); bos.write(msg.getBytes()); exchange.getIn().setBody(bos.toByteArray()); + // add compliation + exchange.getUnitOfWork().addSynchronization(new Synchronization() { + @Override + public void onComplete(Exchange exchange) { + onCompeletedCalled = true; + } + + @Override + public void onFailure(Exchange exchange) { + // do nothing here + } + }); } } public static class TestCxfBinding extends DefaultCxfBinding { + @Override public void populateCxfResponseFromExchange(Exchange camelExchange, org.apache.cxf.message.Exchange cxfExchange) { try { diff --git a/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/CxfOneWayRouteBeans.xml b/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/CxfOneWayRouteBeans.xml index 17fd4bc..a903d98 100644 --- a/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/CxfOneWayRouteBeans.xml +++ b/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/CxfOneWayRouteBeans.xml @@ -34,7 +34,11 @@ <cxf:cxfEndpoint id="routerEndpoint" address="http://localhost:${CXFTestSupport.port1}/CxfOneWayRouteTest/router" serviceClass="org.apache.hello_world_soap_http.GreeterImpl" endpointName="s:SoapPort" - serviceName="s:SOAPService"/> + serviceName="s:SOAPService"> + <cxf:properties> + <entry key="org.apache.cxf.oneway.robust" value="true"/> + </cxf:properties> + </cxf:cxfEndpoint> <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> -- To stop receiving notification emails like this one, please contact ningji...@apache.org.