Author: andreasmyth
Date: Thu Jan 11 09:46:07 2007
New Revision: 495306
URL: http://svn.apache.org/viewvc?view=rev&rev=495306
Log:
[JIRA CXF-272] Synchronization of sequence creation in the presence of multiple
concurrent application requests from the same client endpoint.
[JIRA CXF-279] Merging of acknowledgement ranges.
Modified:
incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/BareInInterceptor.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
Modified:
incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/BareInInterceptor.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/BareInInterceptor.java?view=diff&rev=495306&r1=495305&r2=495306
==============================================================================
---
incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/BareInInterceptor.java
(original)
+++
incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/BareInInterceptor.java
Thu Jan 11 09:46:07 2007
@@ -143,6 +143,8 @@
}
paramNum++;
}
- message.setContent(List.class, parameters);
+ if (parameters.size() > 0) {
+ message.setContent(List.class, parameters);
+ }
}
}
Modified:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?view=diff&rev=495306&r1=495305&r2=495306
==============================================================================
---
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
(original)
+++
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
Thu Jan 11 09:46:07 2007
@@ -147,7 +147,7 @@
range.setUpper(messageNumber);
acknowledgement.getAcknowledgementRange().add(i, range);
}
-
+ mergeRanges();
notifyAll();
}
@@ -155,6 +155,21 @@
scheduleAcknowledgement();
+ }
+
+ void mergeRanges() {
+ List<AcknowledgementRange> ranges =
acknowledgement.getAcknowledgementRange();
+ if (null == ranges) {
+ return;
+ }
+ for (int i = ranges.size() - 1; i > 0; i--) {
+ AcknowledgementRange current = ranges.get(i);
+ AcknowledgementRange previous = ranges.get(i - 1);
+ if
(current.getLower().subtract(previous.getUpper()).equals(BigInteger.ONE)) {
+ previous.setUpper(current.getUpper());
+ ranges.remove(i);
+ }
+ }
}
final void setDestination(Destination d) {
Modified:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?view=diff&rev=495306&r1=495305&r2=495306
==============================================================================
---
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
(original)
+++
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
Thu Jan 11 09:46:07 2007
@@ -94,10 +94,6 @@
return timer;
}
- public synchronized RMEndpoint getReliableEndpoint(Endpoint e) {
- return reliableEndpoints.get(e);
- }
-
public synchronized RMEndpoint getReliableEndpoint(Message message) {
Endpoint endpoint = RMContextUtils.getEndpoint(message);
RMEndpoint rme = reliableEndpoints.get(endpoint);
Modified:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?view=diff&rev=495306&r1=495305&r2=495306
==============================================================================
---
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
(original)
+++
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
Thu Jan 11 09:46:07 2007
@@ -116,21 +116,23 @@
// get the current sequence, requesting the creation of a new one
if necessary
- SourceSequence seq = getManager().getSequence(inSeqId, message,
maps);
- assert null != seq;
+ synchronized (source) {
+ SourceSequence seq = getManager().getSequence(inSeqId,
message, maps);
+ assert null != seq;
- // increase message number and store a sequence type object in
- // context
+ // increase message number and store a sequence type object in
+ // context
- seq.nextMessageNumber(inSeqId, inMessageNumber);
- rmpsOut.setSequence(seq);
+ seq.nextMessageNumber(inSeqId, inMessageNumber);
+ rmpsOut.setSequence(seq);
- // if this was the last message in the sequence, reset the
- // current sequence so that a new one will be created next
- // time the handler is invoked
+ // if this was the last message in the sequence, reset the
+ // current sequence so that a new one will be created next
+ // time the handler is invoked
- if (seq.isLastMessage()) {
- source.setCurrent(null);
+ if (seq.isLastMessage()) {
+ source.setCurrent(null);
+ }
}
} else {
if (!RMContextUtils.isRequestor(message)
Modified:
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java?view=diff&rev=495306&r1=495305&r2=495306
==============================================================================
---
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
(original)
+++
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
Thu Jan 11 09:46:07 2007
@@ -127,31 +127,6 @@
control.verify();
}
- public void testGetAcknowledgementAsStream() throws SequenceFault {
- /*
- destination.getHandler();
- expectLastCall().andReturn(handler).times(3);
- handler.getStore();
- expectLastCall().andReturn(null);
- handler.getConfigurationHelper();
- expectLastCall().andReturn(configurationHelper).times(2);
- configurationHelper.getRMAssertion();
- expectLastCall().andReturn(rma);
- configurationHelper.getAcksPolicy();
- expectLastCall().andReturn(ap);
- control.replay();
-
- DestinationSequence seq = new DestinationSequence(id, ref,
destination);
- List<AcknowledgementRange> ranges =
seq.getAcknowledgment().getAcknowledgementRange();
- assertEquals(0, ranges.size());
-
- seq.acknowledge(new BigInteger("1"));
- assertNotNull(seq.getAcknowledgmentAsStream());
-
- control.verify();
- */
- }
-
public void testAcknowledgeBasic() throws SequenceFault {
setUpDestination();
control.replay();
@@ -264,6 +239,49 @@
assertEquals(6, r.getUpper().intValue());
control.verify();
+ }
+
+ public void testMerge() {
+ DestinationSequence seq = new DestinationSequence(id, ref,
destination);
+ List<AcknowledgementRange> ranges =
seq.getAcknowledgment().getAcknowledgementRange();
+ AcknowledgementRange r;
+ for (int i = 0; i < 5; i++) {
+ r = new AcknowledgementRange();
+ r.setLower(new BigInteger(Integer.toString(3 * i + 1)));
+ r.setUpper(new BigInteger(Integer.toString(3 * i + 3)));
+ ranges.add(r);
+ }
+ seq.mergeRanges();
+ assertEquals(1, ranges.size());
+ r = ranges.get(0);
+ assertEquals(BigInteger.ONE, r.getLower());
+ assertEquals(new BigInteger("15"), r.getUpper());
+ ranges.clear();
+ for (int i = 0; i < 5; i++) {
+ r = new AcknowledgementRange();
+ r.setLower(new BigInteger(Integer.toString(3 * i + 1)));
+ r.setUpper(new BigInteger(Integer.toString(3 * i + 2)));
+ ranges.add(r);
+ }
+ seq.mergeRanges();
+ assertEquals(5, ranges.size());
+ ranges.clear();
+ for (int i = 0; i < 5; i++) {
+ if (i != 2) {
+ r = new AcknowledgementRange();
+ r.setLower(new BigInteger(Integer.toString(3 * i + 1)));
+ r.setUpper(new BigInteger(Integer.toString(3 * i + 3)));
+ ranges.add(r);
+ }
+ }
+ seq.mergeRanges();
+ assertEquals(2, ranges.size());
+ r = ranges.get(0);
+ assertEquals(BigInteger.ONE, r.getLower());
+ assertEquals(new BigInteger("6"), r.getUpper());
+ r = ranges.get(1);
+ assertEquals(BigInteger.TEN, r.getLower());
+ assertEquals(new BigInteger("15"), r.getUpper());
}
public void testMonitor() throws SequenceFault {
Modified:
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?view=diff&rev=495306&r1=495305&r2=495306
==============================================================================
---
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
(original)
+++
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
Thu Jan 11 09:46:07 2007
@@ -74,6 +74,7 @@
private boolean doTestOnewayMessageLoss = testAll;
private boolean doTestTwowayMessageLoss = testAll;
private boolean doTestTwowayNonAnonymousNoOffer = testAll;
+ private boolean doTestConcurrency = testAll;
public static void main(String[] args) {
junit.textui.TestRunner.run(SequenceTest.class);
@@ -636,6 +637,31 @@
mf.verifyActions(expectedActions, false);
mf.verifyMessageNumbers(new String[] {null, null, "1"}, false);
mf.verifyAcknowledgements(new boolean[] {false, false, false}, false);
+ }
+
+ public void testConcurrency() throws Exception {
+ if (!doTestConcurrency) {
+ return;
+ }
+ setupGreeter("org/apache/cxf/systest/ws/rm/concurrent.xml");
+
+ for (int i = 0; i < 5; i++) {
+ greeter.greetMeAsync(Integer.toString(i));
+ }
+
+ // CreateSequence and five greetMe messages
+ // full and partial responses to each
+
+ awaitMessages(6, 12, 7500);
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(),
inRecorder.getInboundMessages());
+
+ mf.verifyMessages(6, true);
+ String[] expectedActions = new String[6];
+ expectedActions[0] = RMConstants.getCreateSequenceAction();
+ for (int i = 1; i < expectedActions.length; i++) {
+ expectedActions[i] = GREETME_ACTION;
+ }
+ mf.verifyActions(expectedActions, true);
}
// --- test utilities ---