Author: andreasmyth
Date: Fri May 25 06:56:23 2007
New Revision: 541656
URL: http://svn.apache.org/viewvc?view=rev&rev=541656
Log:
* Defer creation of the RMEndpoint (and insertion into a strong hashmap) until
a) first message is processed on an interceptor chain including RM interceptors
or b there actually are messages to recover/resend from previous sessions.
* Delete unused sequences that were offered as part of CreateSequence requests
thereby eliminating the need to terminate them explicitly by sending of an
out-of-band LastMessage and a subsequent TerminateSequence. Should address the
failure of the testTerminateOnShutdown test I disabled recently.
Modified:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
Modified:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?view=diff&rev=541656&r1=541655&r2=541656
==============================================================================
---
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
(original)
+++
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
Fri May 25 06:56:23 2007
@@ -361,29 +361,36 @@
void recoverReliableEndpoint(Endpoint endpoint, Conduit conduit) {
if (null == store || null == retransmissionQueue) {
return;
- }
-
- RMEndpoint rme = createReliableEndpoint(endpoint);
- rme.initialise(conduit, null);
- reliableEndpoints.put(endpoint, rme);
+ }
String id = RMUtils.getEndpointIdentifier(endpoint);
- LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}",
- new Object[] {null == conduit ? "client" : "server", id});
+
Collection<SourceSequence> sss = store.getSourceSequences(id);
if (null == sss || 0 == sss.size()) {
return;
}
LOG.log(Level.FINE, "Number of source sequences: {0}", sss.size());
- for (SourceSequence ss : sss) {
- rme.getSource().addSequence(ss, false);
+
+ RMEndpoint rme = null;
+
+ for (SourceSequence ss : sss) {
Collection<RMMessage> ms = store.getMessages(ss.getIdentifier(),
true);
- if (null == ms) {
+ if (null == ms || 0 == ms.size()) {
continue;
}
LOG.log(Level.FINE, "Number of messages in sequence: {0}",
ms.size());
- for (RMMessage m : ms) {
+
+ if (null == rme) {
+ LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}",
+ new Object[] {null == conduit ? "client" : "server",
id});
+ rme = createReliableEndpoint(endpoint);
+ rme.initialise(conduit, null);
+ reliableEndpoints.put(endpoint, rme);
+ }
+ rme.getSource().addSequence(ss, false);
+
+ for (RMMessage m : ms) {
Message message = new MessageImpl();
Exchange exchange = new ExchangeImpl();
@@ -415,7 +422,7 @@
message.setContent(byte[].class, m.getContent());
retransmissionQueue.addUnacknowledged(message);
- }
+ }
}
retransmissionQueue.start();
Modified:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java?view=diff&rev=541656&r1=541655&r2=541656
==============================================================================
---
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
(original)
+++
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
Fri May 25 06:56:23 2007
@@ -19,6 +19,7 @@
package org.apache.cxf.ws.rm;
+import java.math.BigInteger;
import java.util.Collections;
import java.util.List;
import java.util.logging.Level;
@@ -215,18 +216,28 @@
// the following may be necessary if the last message for this
sequence was a oneway
// request and hence there was no response to which a last message
could have been added
- for (SourceSequence outboundSeq :
reliableEndpoint.getSource().getAllSequences()) {
+ // REVISIT: A last message for the correlated sequence should have
been sent by the time
+ // the last message for the underlying sequence was received.
+
+ Source source = reliableEndpoint.getSource();
+
+ for (SourceSequence outboundSeq : source.getAllSequences()) {
if (outboundSeq.offeredBy(sid) && !outboundSeq.isLastMessage()) {
+ if (BigInteger.ZERO.equals(outboundSeq.getCurrentMessageNr()))
{
+ source.removeSequence(outboundSeq);
+ }
// send an out of band message with an empty body and a
// sequence header containing a lastMessage element.
+ /*
Proxy proxy = new Proxy(reliableEndpoint);
try {
proxy.lastMessage(outboundSeq);
} catch (RMException ex) {
LogUtils.log(LOG, Level.SEVERE,
"CORRELATED_SEQ_TERMINATION_EXC", ex);
}
+ */
break;
}
Modified:
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java?view=diff&rev=541656&r1=541655&r2=541656
==============================================================================
---
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
(original)
+++
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
Fri May 25 06:56:23 2007
@@ -530,9 +530,7 @@
RetransmissionQueue queue =
control.createMock(RetransmissionQueue.class);
manager.setStore(store);
manager.setRetransmissionQueue(queue);
- manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
- RMEndpoint rme = control.createMock(RMEndpoint.class);
-
EasyMock.expect(manager.createReliableEndpoint(endpoint)).andReturn(rme);
+
Collection<SourceSequence> sss = new ArrayList<SourceSequence>();
if (null != ss) {
sss.add(ss);
@@ -540,11 +538,7 @@
EasyMock.expect(store.getSourceSequences("{S}s.{P}p")).andReturn(sss);
if (null == ss) {
return;
- }
- Source source = control.createMock(Source.class);
- EasyMock.expect(rme.getSource()).andReturn(source);
- source.addSequence(ss, false);
- EasyMock.expectLastCall();
+ }
Collection<RMMessage> ms = new ArrayList<RMMessage>();
if (null != m) {
@@ -557,6 +551,15 @@
if (null == m) {
return;
}
+
+ manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
+ RMEndpoint rme = control.createMock(RMEndpoint.class);
+
EasyMock.expect(manager.createReliableEndpoint(endpoint)).andReturn(rme);
+ Source source = control.createMock(Source.class);
+ EasyMock.expect(rme.getSource()).andReturn(source);
+ source.addSequence(ss, false);
+ EasyMock.expectLastCall();
+
Service service = control.createMock(Service.class);
EasyMock.expect(endpoint.getService()).andReturn(service);
Binding binding = control.createMock(Binding.class);
Modified:
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?view=diff&rev=541656&r1=541655&r2=541656
==============================================================================
---
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
(original)
+++
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
Fri May 25 06:56:23 2007
@@ -1219,8 +1219,7 @@
mf.verifyLastMessage(new boolean[3], false);
mf.verifyAcknowledgements(new boolean[] {false, true, true}, false);
}
-
- @Ignore
+
@Test
public void testTerminateOnShutdown() throws Exception {
if (!doTestTerminateOnShutdown) {