Author: andreasmyth
Date: Thu May 3 05:48:11 2007
New Revision: 534829
URL: http://svn.apache.org/viewvc?view=rev&rev=534829
Log:
[JIRA CXF-479] Server-side resend.
Also fixed problem that caused the MultiClientOneway test to fail and made the
address manipulation for server-side originated out-of-band RM protocol
messages in Proxy thread safe.
Modified:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
Modified:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java?view=diff&rev=534829&r1=534828&r2=534829
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
(original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
Thu May 3 05:48:11 2007
@@ -204,7 +204,7 @@
final
org.apache.cxf.ws.addressing.EndpointReferenceType address) {
ConduitSelector cs = new DeferredConduitSelector(conduit) {
@Override
- public Conduit selectConduit(Message message) {
+ public synchronized Conduit selectConduit(Message message) {
Conduit conduit = null;
EndpointInfo endpointInfo = getEndpoint().getEndpointInfo();
org.apache.cxf.ws.addressing.EndpointReferenceType original =
Modified:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?view=diff&rev=534829&r1=534828&r2=534829
==============================================================================
---
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
(original)
+++
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
Thu May 3 05:48:11 2007
@@ -105,14 +105,16 @@
Identifier inSeqId = null;
BigInteger inMessageNumber = null;
- if (isApplicationMessage && !isPartialResponse) {
-
+ if (isApplicationMessage) {
rmpsIn =
(RMProperties)RMContextUtils.retrieveRMProperties(message, false);
-
if (null != rmpsIn && null != rmpsIn.getSequence()) {
inSeqId = rmpsIn.getSequence().getIdentifier();
inMessageNumber = rmpsIn.getSequence().getMessageNumber();
}
+ }
+
+ if (isApplicationMessage && !isPartialResponse) {
+
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("inbound sequence: " + (null == inSeqId ? "null" :
inSeqId.getValue()));
}
@@ -166,23 +168,30 @@
RMProperties rmpsOut,
Identifier inSeqId,
AttributedURI to) {
-
for (DestinationSequence seq : destination.getAllSequences()) {
- if (seq.sendAcknowledgement()
- &&
((seq.getAcksTo().getAddress().getValue().equals(RMUtils.getAddressingConstants()
- .getAnonymousURI()) &&
AbstractSequence.identifierEquals(seq.getIdentifier(),
-
inSeqId))
- ||
to.getValue().equals(seq.getAcksTo().getAddress().getValue()))) {
- rmpsOut.addAck(seq);
- } else if (LOG.isLoggable(Level.FINE)) {
- if (!seq.sendAcknowledgement()) {
+ if (!seq.sendAcknowledgement()) {
+ if (LOG.isLoggable(Level.FINE)) {
LOG.fine("no need to add acknowledgements for sequence "
- + seq.getIdentifier().getValue());
- } else {
+ + seq.getIdentifier().getValue());
+ }
+ continue;
+ }
+ if
(!to.getValue().equals(seq.getAcksTo().getAddress().getValue())) {
+ if (LOG.isLoggable(Level.FINE)) {
LOG.fine("sequences acksTo address (" +
seq.getAcksTo().getAddress().getValue()
- + ") does not match to address (" + to.getValue()
+ ")");
+ + ") does not match to address (" + to.getValue() +
")");
+ }
+ continue;
+ }
+ // there may be multiple sources with anonymous acksTo
+ if
(RMConstants.getAnonymousAddress().equals(seq.getAcksTo().getAddress().getValue())
+ && !AbstractSequence.identifierEquals(seq.getIdentifier(),
inSeqId)) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("sequence identifier does not match inbound
sequence identifier");
}
+ continue;
}
+ rmpsOut.addAck(seq);
}
if (LOG.isLoggable(Level.FINE)) {
Modified:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?view=diff&rev=534829&r1=534828&r2=534829
==============================================================================
---
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
(original)
+++
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
Thu May 3 05:48:11 2007
@@ -46,6 +46,7 @@
import org.apache.cxf.message.MessageUtils;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.ws.addressing.AddressingProperties;
import org.apache.cxf.ws.addressing.AttributedURIType;
import org.apache.cxf.ws.policy.AssertionInfo;
@@ -237,7 +238,6 @@
}
private void serverResend(Message message) {
- final Endpoint reliableEndpoint =
manager.getReliableEndpoint(message).getEndpoint();
// get the message's to address
@@ -254,9 +254,11 @@
final String address = to.getValue();
LOG.fine("Resending to address: " + address);
+ final Endpoint reliableEndpoint =
manager.getReliableEndpoint(message).getEndpoint();
+
ConduitSelector cs = new DeferredConduitSelector() {
@Override
- public Conduit selectConduit(Message message) {
+ public synchronized Conduit selectConduit(Message message) {
Conduit conduit = null;
EndpointInfo endpointInfo = reliableEndpoint.getEndpointInfo();
org.apache.cxf.ws.addressing.EndpointReferenceType original =
@@ -273,8 +275,16 @@
}
};
-
- Conduit c = cs.selectConduit(message);
+ cs.setEndpoint(reliableEndpoint);
+ Conduit c = cs.selectConduit(message);
+ // REVISIT
+ // use application endpoint message observer instead?
+ c.setMessageObserver(new MessageObserver() {
+ public void onMessage(Message message) {
+ LOG.fine("Ignoring response to resent message.");
+ }
+
+ });
resend(c, message);
}
Modified:
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?view=diff&rev=534829&r1=534828&r2=534829
==============================================================================
---
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
(original)
+++
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
Thu May 3 05:48:11 2007
@@ -989,7 +989,6 @@
mf.verifyActions(expectedActions, true);
}
- @Ignore
@Test
public void testMultiClientOneway() throws Exception {
if (!doTestMultiClientOneway) {
@@ -1164,7 +1163,6 @@
}
}
- @Ignore
@Test
public void testServerSideMessageLoss() throws Exception {
if (!doTestServerSideMessageLoss) {