This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch camel-2.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.21.x by this push:
     new 9114dcc  CAMEL-12451 Make sure the camel-cxf component release UoW in 
oneway invocation
9114dcc is described below

commit 9114dccdabdfd070f93413bf7c92fc281c37252f
Author: Willem Jiang <jiangni...@huawei.com>
AuthorDate: Fri Apr 20 08:52:45 2018 +0800

    CAMEL-12451 Make sure the camel-cxf component release UoW in oneway 
invocation
---
 .../apache/camel/component/cxf/CxfConsumer.java    | 35 +++++-----------
 .../interceptors/UnitOfWorkCloserInterceptor.java  | 47 ++++++++++++++++++++++
 .../camel/component/cxf/jaxrs/CxfRsConsumer.java   | 37 +++--------------
 .../apache/camel/component/cxf/util/CxfUtils.java  | 14 ++++++-
 .../camel/component/cxf/CxfOneWayRouteTest.java    | 19 ++++++++-
 .../camel/component/cxf/CxfOneWayRouteBeans.xml    |  6 ++-
 6 files changed, 98 insertions(+), 60 deletions(-)

diff --git 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
index 799f5d5..24003cb 100644
--- 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
+++ 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
@@ -21,6 +21,9 @@ import java.util.HashMap;
 import java.util.Map;
 
 import javax.xml.ws.WebFault;
+
+import org.apache.camel.component.cxf.interceptors.UnitOfWorkCloserInterceptor;
+import org.apache.camel.component.cxf.util.CxfUtils;
 import org.w3c.dom.Element;
 
 import org.apache.camel.AsyncCallback;
@@ -39,7 +42,6 @@ import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.FaultMode;
 import org.apache.cxf.message.Message;
-import org.apache.cxf.phase.AbstractPhaseInterceptor;
 import org.apache.cxf.phase.Phase;
 import org.apache.cxf.service.invoker.Invoker;
 import org.apache.cxf.service.model.BindingOperationInfo;
@@ -83,38 +85,18 @@ public class CxfConsumer extends DefaultConsumer {
 
         final MessageObserver originalOutFaultObserver = 
server.getEndpoint().getOutFaultObserver();
         server.getEndpoint().setOutFaultObserver(message -> {
-            Exchange cxfExchange = null;
-            if ((cxfExchange = message.getExchange()) != null) {
-                org.apache.camel.Exchange exchange = 
cxfExchange.get(org.apache.camel.Exchange.class);
-                if (exchange != null) {
-                    doneUoW(exchange);
-                }
-            }
             originalOutFaultObserver.onMessage(message);
+            CxfUtils.closeCamelUnitOfWork(message);
         });
 
+        // setup the UnitOfWorkCloserInterceptor for OneWayMessageProcessor
+        server.getEndpoint().getInInterceptors().add(new 
UnitOfWorkCloserInterceptor(Phase.POST_INVOKE, true));
+        // close the UnitOfWork normally
         server.getEndpoint().getOutInterceptors().add(new 
UnitOfWorkCloserInterceptor());
 
         return server;
     }
 
-    //closes UnitOfWork in good case
-    private class UnitOfWorkCloserInterceptor extends 
AbstractPhaseInterceptor<Message> {
-        public UnitOfWorkCloserInterceptor() {
-            super(Phase.POST_LOGICAL_ENDING);
-        }
-
-        @Override
-        public void handleMessage(Message message) throws Fault {
-            Exchange cxfExchange = null;
-            if ((cxfExchange = message.getExchange()) != null) {
-                org.apache.camel.Exchange exchange = 
cxfExchange.get(org.apache.camel.Exchange.class);
-                if (exchange != null) {
-                    doneUoW(exchange);
-                }
-            }
-        }
-    }
 
     public Server getServer() {
         return server;
@@ -162,8 +144,9 @@ public class CxfConsumer extends DefaultConsumer {
         // we assume it should support AsyncInvocation out of box
         return true;
     }
-    
+
     private class CxfConsumerInvoker implements Invoker {
+
         private final CxfEndpoint endpoint;
 
         CxfConsumerInvoker(CxfEndpoint endpoint) {
diff --git 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/UnitOfWorkCloserInterceptor.java
 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/UnitOfWorkCloserInterceptor.java
new file mode 100644
index 0000000..8825403
--- /dev/null
+++ 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/UnitOfWorkCloserInterceptor.java
@@ -0,0 +1,47 @@
+package org.apache.camel.component.cxf.interceptors;
+
+import org.apache.camel.component.cxf.util.CxfUtils;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.interceptor.OutgoingChainInterceptor;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.service.model.BindingOperationInfo;
+
+//closes UnitOfWork in good case
+public class UnitOfWorkCloserInterceptor extends 
AbstractPhaseInterceptor<Message> {
+    boolean handleOneWayMessage;
+
+    public UnitOfWorkCloserInterceptor(String phase, boolean 
handleOneWayMessage) {
+        super(phase);
+        // Just make sure this interceptor is add after the 
OutgoingChainInterceptor
+        if (phase.equals(Phase.POST_INVOKE)) {
+            addAfter(OutgoingChainInterceptor.class.getName());
+        }
+        this.handleOneWayMessage = handleOneWayMessage;
+    }
+    public UnitOfWorkCloserInterceptor() {
+        super(Phase.POST_LOGICAL_ENDING);
+    }
+
+    @Override
+    public void handleMessage(Message message) throws Fault {
+        if (handleOneWayMessage) {
+            if (isOneWay(message)) {
+                CxfUtils.closeCamelUnitOfWork(message);
+            }
+        } else { // Just do the normal process
+            CxfUtils.closeCamelUnitOfWork(message);
+        }
+    }
+
+    private boolean isOneWay(Message message) {
+        Exchange ex = message.getExchange();
+        BindingOperationInfo binding = ex.getBindingOperationInfo();
+        if (null != binding && null != binding.getOperationInfo() && 
binding.getOperationInfo().isOneWay()) {
+           return true;
+        }
+        return false;
+    }
+}
diff --git 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java
 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java
index e97caa9..4c68e49 100644
--- 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java
+++ 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java
@@ -16,16 +16,13 @@
  */
 package org.apache.camel.component.cxf.jaxrs;
 
-import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.component.cxf.interceptors.UnitOfWorkCloserInterceptor;
+import org.apache.camel.component.cxf.util.CxfUtils;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.cxf.Bus;
 import org.apache.cxf.endpoint.Server;
-import org.apache.cxf.interceptor.Fault;
-import org.apache.cxf.interceptor.OutFaultChainInitiatorObserver;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
-import org.apache.cxf.message.Message;
-import org.apache.cxf.phase.AbstractPhaseInterceptor;
 import org.apache.cxf.phase.Phase;
 import org.apache.cxf.transport.MessageObserver;
 
@@ -56,7 +53,9 @@ public class CxfRsConsumer extends DefaultConsumer {
         }
 
         svrBean.setInvoker(cxfRsInvoker);
-
+        // setup the UnitOfWorkCloserInterceptor for OneWayMessageProcessor
+        svrBean.getInInterceptors().add(new 
UnitOfWorkCloserInterceptor(Phase.POST_INVOKE, true));
+        // close the UnitOfWork normally
         svrBean.getOutInterceptors().add(new UnitOfWorkCloserInterceptor());
 
 
@@ -65,37 +64,13 @@ public class CxfRsConsumer extends DefaultConsumer {
         final MessageObserver originalOutFaultObserver = 
server.getEndpoint().getOutFaultObserver();
         //proxy OutFaultObserver so we can close 
org.apache.camel.spi.UnitOfWork in case of error
         server.getEndpoint().setOutFaultObserver(message -> {
-            org.apache.cxf.message.Exchange cxfExchange = null;
-            if ((cxfExchange = message.getExchange()) != null) {
-                org.apache.camel.Exchange exchange = 
cxfExchange.get(org.apache.camel.Exchange.class);
-                if (exchange != null) {
-                    doneUoW(exchange);
-                }
-            }
+            CxfUtils.closeCamelUnitOfWork(message);
             originalOutFaultObserver.onMessage(message);
         });
 
         return server;
     }
 
-    //closes UnitOfWork in good case
-    private class UnitOfWorkCloserInterceptor extends 
AbstractPhaseInterceptor<Message> {
-        public UnitOfWorkCloserInterceptor() {
-            super(Phase.POST_LOGICAL_ENDING);
-        }
-
-        @Override
-        public void handleMessage(Message message) throws Fault {
-            org.apache.cxf.message.Exchange cxfExchange = null;
-            if ((cxfExchange = message.getExchange()) != null) {
-                org.apache.camel.Exchange exchange = 
cxfExchange.get(org.apache.camel.Exchange.class);
-                if (exchange != null) {
-                    doneUoW(exchange);
-                }
-            }
-        }
-    }
-
     @Override
     protected void doStart() throws Exception {
         super.doStart();
diff --git 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/util/CxfUtils.java
 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/util/CxfUtils.java
index 45894fb..23b185c 100644
--- 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/util/CxfUtils.java
+++ 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/util/CxfUtils.java
@@ -24,6 +24,9 @@ import java.util.Map.Entry;
 import javax.xml.stream.XMLStreamException;
 import javax.xml.stream.XMLStreamWriter;
 
+import org.apache.camel.util.UnitOfWorkHelper;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
 import org.w3c.dom.Element;
 import org.w3c.dom.NamedNodeMap;
 import org.w3c.dom.Node;
@@ -170,6 +173,15 @@ public final class CxfUtils {
             }
         }
     }
-    
 
+
+    public static void closeCamelUnitOfWork(Message message) {
+        Exchange cxfExchange = null;
+        if ((cxfExchange = message.getExchange()) != null) {
+            org.apache.camel.Exchange exchange = 
cxfExchange.get(org.apache.camel.Exchange.class);
+            if (exchange != null) {
+                UnitOfWorkHelper.doneUow(exchange.getUnitOfWork(), exchange);
+            }
+        }
+    }
 }
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfOneWayRouteTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfOneWayRouteTest.java
index 230c58f..9c16890 100644
--- 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfOneWayRouteTest.java
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfOneWayRouteTest.java
@@ -26,6 +26,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.Synchronization;
 import org.apache.camel.test.spring.CamelSpringTestSupport;
 import org.apache.hello_world_soap_http.Greeter;
 import org.junit.Before;
@@ -46,11 +47,13 @@ public class CxfOneWayRouteTest extends 
CamelSpringTestSupport {
 
     private static Exception bindingException;
     private static boolean bindingDone;
-    
+    private static boolean onCompeletedCalled;
+
     @Before
     public void setup() {
         bindingException = null;
         bindingDone = false;
+        onCompeletedCalled = false;
     }
     
     @Override
@@ -82,6 +85,7 @@ public class CxfOneWayRouteTest extends 
CamelSpringTestSupport {
         }
 
         assertMockEndpointsSatisfied();
+        assertTrue("UnitOfWork done should be called", onCompeletedCalled);
         assertNull("exception occured: " + bindingException, bindingException);
     }
     
@@ -97,10 +101,23 @@ public class CxfOneWayRouteTest extends 
CamelSpringTestSupport {
             bos.write(MAGIC);
             bos.write(msg.getBytes());
             exchange.getIn().setBody(bos.toByteArray());
+            // add compliation
+            exchange.getUnitOfWork().addSynchronization(new Synchronization() {
+                @Override
+                public void onComplete(Exchange exchange) {
+                    onCompeletedCalled = true;
+                }
+
+                @Override
+                public void onFailure(Exchange exchange) {
+                    // do nothing here
+                }
+            });
         }
     }
     
     public static class TestCxfBinding extends DefaultCxfBinding {
+
         @Override
         public void populateCxfResponseFromExchange(Exchange camelExchange, 
org.apache.cxf.message.Exchange cxfExchange) {
             try {
diff --git 
a/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/CxfOneWayRouteBeans.xml
 
b/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/CxfOneWayRouteBeans.xml
index 17fd4bc..a903d98 100644
--- 
a/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/CxfOneWayRouteBeans.xml
+++ 
b/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/CxfOneWayRouteBeans.xml
@@ -34,7 +34,11 @@
   <cxf:cxfEndpoint id="routerEndpoint" 
address="http://localhost:${CXFTestSupport.port1}/CxfOneWayRouteTest/router";
     serviceClass="org.apache.hello_world_soap_http.GreeterImpl"
     endpointName="s:SoapPort"
-    serviceName="s:SOAPService"/>
+    serviceName="s:SOAPService">
+    <cxf:properties>
+      <entry key="org.apache.cxf.oneway.robust" value="true"/>
+    </cxf:properties>
+  </cxf:cxfEndpoint>
 
   <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring";>
     <route>

-- 
To stop receiving notification emails like this one, please contact
ningji...@apache.org.

Reply via email to