Hi Aki, On reviewing the code, I think you'll also need to make a change to Destination.acknowledge() for this to work correctly. Right now Destination.acknowledge() is what persists the received message to the store, so if it's not called until processing is complete messages will never be persisted. This should be moved out to a separate method which can be called by RMInInterceptor.
Separately, it looks like we need to change the code to handle passing persisted messages on to the application when recovering from the store. It looks to me like at present messages will be acknowledged by the RM layer but never delivered to the application if there's a crash or shutdown while they're waiting to be processed. What do you think? - Dennis On 03/06/2012 08:51 PM, a...@apache.org wrote: > Author: ay > Date: Tue Mar 6 07:51:02 2012 > New Revision: 1297370 > > URL: http://svn.apache.org/viewvc?rev=1297370&view=rev > Log: > Merged revisions 1297296 via svnmerge from > https://svn.apache.org/repos/asf/cxf/trunk > > ........ > r1297296 | ay | 2012-03-06 00:57:14 +0100 (Tue, 06 Mar 2012) | 1 line > > [CXF-4164] Robust-InOnly processing with WS-RM must delay updating the > sequence until message delivery > ........ > > Added: > > cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckBase.java > - copied unchanged from r1297296, > cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckBase.java > > cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckPersistenceTest.java > - copied unchanged from r1297296, > cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckPersistenceTest.java > > cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/sync-ack-persistent-server.xml > - copied unchanged from r1297296, > cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/sync-ack-persistent-server.xml > Modified: > cxf/branches/2.5.x-fixes/ (props changed) > > cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java > > cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java > > cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java > > Propchange: cxf/branches/2.5.x-fixes/ > ------------------------------------------------------------------------------ > svn:mergeinfo = /cxf/trunk:1297296 > > Propchange: cxf/branches/2.5.x-fixes/ > ------------------------------------------------------------------------------ > Binary property 'svnmerge-integrated' - no diff available. > > Modified: > cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java > URL: > http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java?rev=1297370&r1=1297369&r2=1297370&view=diff > ============================================================================== > --- > cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java > (original) > +++ > cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java > Tue Mar 6 07:51:02 2012 > @@ -23,6 +23,7 @@ import java.util.logging.Logger; > > import org.apache.cxf.common.logging.LogUtils; > import org.apache.cxf.message.Message; > +import org.apache.cxf.message.MessageUtils; > import org.apache.cxf.phase.Phase; > > /** > @@ -42,6 +43,12 @@ public class RMDeliveryInterceptor exten > > public void handle(Message message) throws SequenceFault, RMException { > LOG.entering(getClass().getName(), "handleMessage"); > - getManager().getDestination(message).processingComplete(message); > + Destination dest = getManager().getDestination(message); > + final boolean robust = > + > MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY)); > + if (robust) { > + dest.acknowledge(message); > + } > + dest.processingComplete(message); > } > } > > Modified: > cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java > URL: > http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?rev=1297370&r1=1297369&r2=1297370&view=diff > ============================================================================== > --- > cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java > (original) > +++ > cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java > Tue Mar 6 07:51:02 2012 > @@ -25,6 +25,7 @@ import java.util.logging.Logger; > > import org.apache.cxf.common.logging.LogUtils; > import org.apache.cxf.message.Message; > +import org.apache.cxf.message.MessageUtils; > import org.apache.cxf.ws.addressing.AddressingPropertiesImpl; > import org.apache.cxf.ws.addressing.ContextUtils; > import org.apache.cxf.ws.addressing.MAPAggregator; > @@ -150,7 +151,11 @@ public class RMInInterceptor extends Abs > > void processSequence(Destination destination, Message message) > throws SequenceFault, RMException { > - destination.acknowledge(message); > + final boolean robust = > + > MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY)); > + if (!robust) { > + destination.acknowledge(message); > + } > } > > void processDeliveryAssurance(RMProperties rmps) { > > Modified: > cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java > URL: > http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java?rev=1297370&r1=1297369&r2=1297370&view=diff > ============================================================================== > --- > cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java > (original) > +++ > cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java > Tue Mar 6 07:51:02 2012 > @@ -18,186 +18,12 @@ > */ > package org.apache.cxf.systest.ws.rm; > > -import java.net.MalformedURLException; > -import java.util.logging.Logger; > - > -import javax.xml.ws.Endpoint; > - > -import org.apache.cxf.Bus; > -import org.apache.cxf.BusFactory; > -import org.apache.cxf.bus.spring.SpringBusFactory; > -import org.apache.cxf.common.logging.LogUtils; > -import org.apache.cxf.greeter_control.Control; > -import org.apache.cxf.greeter_control.ControlService; > -import org.apache.cxf.greeter_control.Greeter; > -import org.apache.cxf.greeter_control.GreeterService; > -import org.apache.cxf.greeter_control.types.FaultLocation; > -import org.apache.cxf.interceptor.ServiceInvokerInterceptor; > -import org.apache.cxf.phase.Phase; > -import org.apache.cxf.test.TestUtilities; > -import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; > -import org.apache.cxf.testutil.common.AbstractBusTestServerBase; > -import org.apache.cxf.ws.rm.RMManager; > - > -import org.junit.After; > -import org.junit.AfterClass; > -import org.junit.BeforeClass; > -import org.junit.Test; > - > /** > * Tests the acknowledgement delivery back to the non-decoupled port when > there is some > * error at the provider side and how its behavior is affected by the robust > in-only mode setting. > */ > -public class ServiceInvocationAckTest extends > AbstractBusClientServerTestBase { > - public static final String PORT = allocatePort(Server.class); > - > - private static final Logger LOG = > LogUtils.getLogger(ServiceInvocationAckTest.class); > - > - private static final String CONTROL_PORT_ADDRESS = > - "http://localhost:" + PORT + "/SoapContext/ControlPort"; > - > - public static class Server extends AbstractBusTestServerBase { > - > - protected void run() { > - SpringBusFactory factory = new SpringBusFactory(); > - Bus bus = factory.createBus(); > - BusFactory.setDefaultBus(bus); > - setBus(bus); > - > - ControlImpl implementor = new ControlImpl(); > - implementor.setAddress("http://localhost:" + PORT + > "/SoapContext/GreeterPort"); > - GreeterImpl greeterImplementor = new GreeterImpl(); > - implementor.setImplementor(greeterImplementor); > - Endpoint.publish(CONTROL_PORT_ADDRESS, implementor); > - LOG.fine("Published control endpoint."); > - } > - > - public static void main(String[] args) { > - try { > - Server s = new Server(); > - s.start(); > - } catch (Exception ex) { > - ex.printStackTrace(); > - System.exit(-1); > - } finally { > - System.out.println("done!"); > - } > - } > - } > - > - private Bus controlBus; > - private Control control; > - private Bus greeterBus; > - private Greeter greeter; > - > - > - @BeforeClass > - public static void startServers() throws Exception { > - TestUtilities.setKeepAliveSystemProperty(false); > - assertTrue("server did not launch correctly", > launchServer(Server.class, true)); > - } > - > - @AfterClass > - public static void cleanup() { > - TestUtilities.recoverKeepAliveSystemProperty(); > - } > - > - @After > - public void tearDown() { > - if (null != greeter) { > - assertTrue("Failed to stop greeter.", control.stopGreeter(null)); > - greeterBus.shutdown(true); > - greeterBus = null; > - } > - if (null != control) { > - assertTrue("Failed to stop greeter", control.stopGreeter(null)); > - controlBus.shutdown(true); > - } > - } > - > - @Test > - public void testDefaultInvocationHandling() throws Exception { > +public class ServiceInvocationAckTest extends ServiceInvocationAckBase { > + protected void setupGreeter() throws Exception { > setupGreeter("org/apache/cxf/systest/ws/rm/sync-ack-server.xml"); > - > - control.setRobustInOnlyMode(false); > - > - FaultLocation location = new > org.apache.cxf.greeter_control.types.ObjectFactory() > - .createFaultLocation(); > - location.setPhase(Phase.INVOKE); > - location.setBefore(ServiceInvokerInterceptor.class.getName()); > - > - RMManager manager = greeterBus.getExtension(RMManager.class); > - > - // the message is acked and the invocation takes place > - greeter.greetMeOneWay("one"); > - Thread.sleep(6000L); > - assertTrue("RetransmissionQueue must be empty", > manager.getRetransmissionQueue().isEmpty()); > - > - control.setFaultLocation(location); > - > - // the invocation fails but the message is acked because the > delivery succeeds > - greeter.greetMeOneWay("two"); > - Thread.sleep(6000L); > - assertTrue("RetransmissionQueue must be empty", > manager.getRetransmissionQueue().isEmpty()); > - } > - > - @Test > - public void testRobustInvocationHandling() throws Exception { > - setupGreeter("org/apache/cxf/systest/ws/rm/sync-ack-server.xml"); > - > - control.setRobustInOnlyMode(true); > - > - FaultLocation location = new > org.apache.cxf.greeter_control.types.ObjectFactory() > - .createFaultLocation(); > - location.setPhase(Phase.INVOKE); > - location.setBefore(ServiceInvokerInterceptor.class.getName()); > - > - RMManager manager = greeterBus.getExtension(RMManager.class); > - > - > - // the message is acked and the invocation takes place > - greeter.greetMeOneWay("one"); > - Thread.sleep(6000L); > - assertTrue("RetransmissionQueue must be empty", > manager.getRetransmissionQueue().isEmpty()); > - > - control.setFaultLocation(location); > - > - // the invocation fails but the message is acked because the > delivery succeeds > - greeter.greetMeOneWay("two"); > - Thread.sleep(6000L); > - assertFalse("RetransmissionQueue must not be empty", > manager.getRetransmissionQueue().isEmpty()); > - > - location.setPhase(null); > - control.setFaultLocation(location); > - > - // the retransmission succeeds and the invocation succeeds, the > message is acked > - Thread.sleep(6000L); > - assertTrue("RetransmissionQueue must be empty", > manager.getRetransmissionQueue().isEmpty()); > - > - } > - > - private void setupGreeter(String cfgResource) throws > NumberFormatException, MalformedURLException { > - > - SpringBusFactory bf = new SpringBusFactory(); > - > - controlBus = bf.createBus(); > - BusFactory.setDefaultBus(controlBus); > - > - ControlService cs = new ControlService(); > - control = cs.getControlPort(); > - updateAddressPort(control, PORT); > - > - assertTrue("Failed to start greeter", > control.startGreeter(cfgResource)); > - > - greeterBus = bf.createBus(cfgResource); > - BusFactory.setDefaultBus(greeterBus); > - LOG.fine("Initialised greeter bus with configuration: " + > cfgResource); > - > - GreeterService gs = new GreeterService(); > - > - greeter = gs.getGreeterPort(); > - updateAddressPort(greeter, PORT); > - LOG.fine("Created greeter client."); > - > } > } > >