Author: ffang Date: Wed Nov 24 08:41:56 2010 New Revision: 1038524 URL: http://svn.apache.org/viewvc?rev=1038524&view=rev Log: Merged revisions 1038519 via svnmerge from https://svn.apache.org/repos/asf/cxf/branches/2.3.x-fixes
................ r1038519 | ffang | 2010-11-24 16:18:31 +0800 (三, 24 11 2010) | 9 lines Merged revisions 1038509 via svnmerge from https://svn.apache.org/repos/asf/cxf/trunk ........ r1038509 | ffang | 2010-11-24 15:42:49 +0800 (三, 24 11 2010) | 1 line [CXF-3114]WS-RM's RMTxStore's does not recover stored sequences after restart ........ ................ Modified: cxf/branches/2.2.x-fixes/ (props changed) cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java cxf/branches/2.2.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java cxf/branches/2.2.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java Propchange: cxf/branches/2.2.x-fixes/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java URL: http://svn.apache.org/viewvc/cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1038524&r1=1038523&r2=1038524&view=diff ============================================================================== --- cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original) +++ cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Wed Nov 24 08:41:56 2010 @@ -394,12 +394,18 @@ public class RMManager implements Server String id = RMUtils.getEndpointIdentifier(endpoint); Collection<SourceSequence> sss = store.getSourceSequences(id); - if (null == sss || 0 == sss.size()) { + Collection<DestinationSequence> dss = store.getDestinationSequences(id); + if ((null == sss || 0 == sss.size()) && (null == dss || 0 == dss.size())) { return; } LOG.log(Level.FINE, "Number of source sequences: {0}", sss.size()); + LOG.log(Level.FINE, "Number of destination sequences: {0}", dss.size()); - RMEndpoint rme = null; + LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}", + new Object[] {null == conduit ? "client" : "server", id}); + RMEndpoint rme = createReliableEndpoint(endpoint); + rme.initialise(conduit, null); + reliableEndpoints.put(endpoint, rme); for (SourceSequence ss : sss) { Collection<RMMessage> ms = store.getMessages(ss.getIdentifier(), true); @@ -408,13 +414,6 @@ public class RMManager implements Server } LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size()); - if (null == rme) { - LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}", - new Object[] {null == conduit ? "client" : "server", id}); - rme = createReliableEndpoint(endpoint); - rme.initialise(conduit, null); - reliableEndpoints.put(endpoint, rme); - } rme.getSource().addSequence(ss, false); for (RMMessage m : ms) { @@ -457,6 +456,10 @@ public class RMManager implements Server retransmissionQueue.addUnacknowledged(message); } } + + for (DestinationSequence ds : dss) { + rme.getDestination().addSequence(ds, false); + } retransmissionQueue.start(); } Modified: cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java URL: http://svn.apache.org/viewvc/cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1038524&r1=1038523&r2=1038524&view=diff ============================================================================== --- cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java (original) +++ cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java Wed Nov 24 08:41:56 2010 @@ -604,7 +604,7 @@ public class RMTxStore implements RMStor } try { - connection.setAutoCommit(false); + connection.setAutoCommit(true); createTables(); } catch (SQLException ex) { LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", ex); @@ -614,7 +614,14 @@ public class RMTxStore implements RMStor LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", se); } throw new RMStoreException(ex); - } + } finally { + try { + connection.setAutoCommit(false); + } catch (SQLException ex) { + LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", ex); + throw new RMStoreException(ex); + } + } } Connection getConnection() { Modified: cxf/branches/2.2.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java URL: http://svn.apache.org/viewvc/cxf/branches/2.2.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java?rev=1038524&r1=1038523&r2=1038524&view=diff ============================================================================== --- cxf/branches/2.2.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java (original) +++ cxf/branches/2.2.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java Wed Nov 24 08:41:56 2010 @@ -491,7 +491,7 @@ public class RMManagerTest extends Asser InterfaceInfo ii = control.createMock(InterfaceInfo.class); setUpEndpointForRecovery(endpoint, ei, si, bi, ii); Conduit conduit = control.createMock(Conduit.class); - setUpRecoverReliableEndpoint(endpoint, conduit, null, null); + setUpRecoverReliableEndpoint(endpoint, conduit, null, null, null); control.replay(); manager.recoverReliableEndpoint(endpoint, conduit); control.verify(); @@ -499,7 +499,8 @@ public class RMManagerTest extends Asser control.reset(); setUpEndpointForRecovery(endpoint, ei, si, bi, ii); SourceSequence ss = control.createMock(SourceSequence.class); - setUpRecoverReliableEndpoint(endpoint, conduit, ss, null); + DestinationSequence ds = control.createMock(DestinationSequence.class); + setUpRecoverReliableEndpoint(endpoint, conduit, ss, ds, null); control.replay(); manager.recoverReliableEndpoint(endpoint, conduit); control.verify(); @@ -507,7 +508,7 @@ public class RMManagerTest extends Asser control.reset(); setUpEndpointForRecovery(endpoint, ei, si, bi, ii); RMMessage m = control.createMock(RMMessage.class); - setUpRecoverReliableEndpoint(endpoint, conduit, ss, m); + setUpRecoverReliableEndpoint(endpoint, conduit, ss, ds, m); control.replay(); manager.recoverReliableEndpoint(endpoint, conduit); control.verify(); @@ -529,7 +530,7 @@ public class RMManagerTest extends Asser void setUpRecoverReliableEndpoint(Endpoint endpoint, Conduit conduit, SourceSequence ss, - RMMessage m) { + DestinationSequence ds, RMMessage m) { RMStore store = control.createMock(RMStore.class); RetransmissionQueue queue = control.createMock(RetransmissionQueue.class); manager.setStore(store); @@ -544,6 +545,14 @@ public class RMManagerTest extends Asser return; } + Collection<DestinationSequence> dss = new ArrayList<DestinationSequence>(); + if (null != ds) { + dss.add(ds); + } + EasyMock.expect(store.getDestinationSequences("{S}s.{P}p")).andReturn(dss); + if (null == ds) { + return; + } Collection<RMMessage> ms = new ArrayList<RMMessage>(); if (null != m) { ms.add(m); @@ -552,25 +561,29 @@ public class RMManagerTest extends Asser id.setValue("S1"); EasyMock.expect(ss.getIdentifier()).andReturn(id).times(null == m ? 1 : 2); EasyMock.expect(store.getMessages(id, true)).andReturn(ms); - if (null == m) { - return; - } + manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>()); RMEndpoint rme = control.createMock(RMEndpoint.class); EasyMock.expect(manager.createReliableEndpoint(endpoint)).andReturn(rme); Source source = control.createMock(Source.class); - EasyMock.expect(rme.getSource()).andReturn(source); - source.addSequence(ss, false); + EasyMock.expect(rme.getSource()).andReturn(source).anyTimes(); + + Destination destination = control.createMock(Destination.class); + EasyMock.expect(rme.getDestination()).andReturn(destination); + destination.addSequence(ds, false); EasyMock.expectLastCall(); Service service = control.createMock(Service.class); - EasyMock.expect(endpoint.getService()).andReturn(service); + EasyMock.expect(endpoint.getService()).andReturn(service).anyTimes(); Binding binding = control.createMock(Binding.class); - EasyMock.expect(endpoint.getBinding()).andReturn(binding); + EasyMock.expect(endpoint.getBinding()).andReturn(binding).anyTimes(); - EasyMock.expect(ss.isLastMessage()).andReturn(true); - EasyMock.expect(ss.getCurrentMessageNr()).andReturn(BigInteger.TEN); + EasyMock.expect(ss.isLastMessage()).andReturn(true).anyTimes(); + EasyMock.expect(ss.getCurrentMessageNr()).andReturn(BigInteger.TEN).anyTimes(); + if (null == m) { + return; + } EasyMock.expect(m.getMessageNumber()).andReturn(BigInteger.TEN).times(2); if (null == conduit) { EasyMock.expect(m.getTo()).andReturn("toAddress"); Modified: cxf/branches/2.2.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java URL: http://svn.apache.org/viewvc/cxf/branches/2.2.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java?rev=1038524&r1=1038523&r2=1038524&view=diff ============================================================================== --- cxf/branches/2.2.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java (original) +++ cxf/branches/2.2.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java Wed Nov 24 08:41:56 2010 @@ -71,7 +71,8 @@ public class ServerPersistenceTest exten private OutMessageRecorder out; private InMessageRecorder in; - + private Bus greeterBus; + @BeforeClass public static void startServers() throws Exception { RMTxStore.deleteDatabaseFiles(); @@ -104,7 +105,7 @@ public class ServerPersistenceTest exten assertTrue("Failed to start greeter", control.startGreeter(SERVER_LOSS_CFG)); LOG.fine("Started greeter server."); - Bus greeterBus = new SpringBusFactory().createBus(CFG); + greeterBus = new SpringBusFactory().createBus(CFG); LOG.fine("Created bus " + greeterBus + " with cfg : " + CFG); BusFactory.setDefaultBus(greeterBus); @@ -132,7 +133,7 @@ public class ServerPersistenceTest exten LOG.fine("Configured greeter client."); - Response<GreetMeResponse> responses[] = cast(new Response[3]); + Response<GreetMeResponse> responses[] = cast(new Response[4]); responses[0] = greeter.greetMeAsync("one"); responses[1] = greeter.greetMeAsync("two"); @@ -151,6 +152,12 @@ public class ServerPersistenceTest exten verifyServerRecovery(responses); + out.getOutboundMessages().clear(); + in.getInboundMessages().clear(); + + responses[3] = greeter.greetMeAsync("four"); + verifyRetransmissionQueue(); + greeterBus.shutdown(true); control.stopGreeter(CFG); @@ -163,7 +170,7 @@ public class ServerPersistenceTest exten // wait another while to prove that response to second request is indeed lost Thread.sleep(4000); int nDone = 0; - for (int i = 0; i < responses.length; i++) { + for (int i = 0; i < 3; i++) { if (responses[i].isDone()) { nDone++; } @@ -197,7 +204,7 @@ public class ServerPersistenceTest exten int nDone = 0; long waited = 0; while (waited < 5000) { - for (int i = 0; i < responses.length; i++) { + for (int i = 0; i < responses.length - 1; i++) { if (responses[i].isDone()) { nDone++; } @@ -228,6 +235,13 @@ public class ServerPersistenceTest exten } + void verifyRetransmissionQueue() throws Exception { + awaitMessages(1, 3, 40000); + + boolean empty = greeterBus.getExtension(RMManager.class).getRetransmissionQueue().isEmpty(); + assertTrue("Retransmission Queue is not empty", empty); + } + protected void awaitMessages(int nExpectedOut, int nExpectedIn) { awaitMessages(nExpectedOut, nExpectedIn, 10000); }
