Author: dkulp
Date: Fri Mar 21 09:21:36 2008
New Revision: 639697
URL: http://svn.apache.org/viewvc?rev=639697&view=rev
Log:
Merged revisions 639047 via svnmerge from
https://svn.apache.org/repos/asf/incubator/cxf/trunk
........
r639047 | dkulp | 2008-03-19 18:44:58 -0400 (Wed, 19 Mar 2008) | 2 lines
[CXF-1398] Fixes to get the local transport working with MTOM stuff.
........
Modified:
incubator/cxf/branches/2.0.x-fixes/ (props changed)
incubator/cxf/branches/2.0.x-fixes/rt/bindings/object/src/main/java/org/apache/cxf/binding/object/ObjectDispatchOutInterceptor.java
incubator/cxf/branches/2.0.x-fixes/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/LocalServerRegistrationTest.java
incubator/cxf/branches/2.0.x-fixes/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/ObjectBindingTest.java
incubator/cxf/branches/2.0.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
incubator/cxf/branches/2.0.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java
incubator/cxf/branches/2.0.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java
Propchange: incubator/cxf/branches/2.0.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified:
incubator/cxf/branches/2.0.x-fixes/rt/bindings/object/src/main/java/org/apache/cxf/binding/object/ObjectDispatchOutInterceptor.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/branches/2.0.x-fixes/rt/bindings/object/src/main/java/org/apache/cxf/binding/object/ObjectDispatchOutInterceptor.java?rev=639697&r1=639696&r2=639697&view=diff
==============================================================================
---
incubator/cxf/branches/2.0.x-fixes/rt/bindings/object/src/main/java/org/apache/cxf/binding/object/ObjectDispatchOutInterceptor.java
(original)
+++
incubator/cxf/branches/2.0.x-fixes/rt/bindings/object/src/main/java/org/apache/cxf/binding/object/ObjectDispatchOutInterceptor.java
Fri Mar 21 09:21:36 2008
@@ -18,6 +18,9 @@
*/
package org.apache.cxf.binding.object;
+import java.util.HashSet;
+import java.util.Set;
+
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
@@ -25,16 +28,22 @@
import org.apache.cxf.phase.Phase;
import org.apache.cxf.service.model.BindingOperationInfo;
import org.apache.cxf.transport.local.LocalConduit;
+import org.apache.cxf.transport.local.LocalTransportFactory;
public class ObjectDispatchOutInterceptor extends
AbstractPhaseInterceptor<Message> {
-
+ private Set<String> includes = new HashSet<String>();
+
public ObjectDispatchOutInterceptor() {
super(Phase.SETUP);
+ includes.add(ObjectBinding.OPERATION);
+ includes.add(ObjectBinding.BINDING);
}
public void handleMessage(Message message) throws Fault {
Exchange ex = message.getExchange();
message.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE);
+ message.put(LocalTransportFactory.MESSAGE_INCLUDE_PROPERTIES,
+ includes);
BindingOperationInfo bop = ex.get(BindingOperationInfo.class);
message.put(ObjectBinding.OPERATION, bop.getName());
Modified:
incubator/cxf/branches/2.0.x-fixes/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/LocalServerRegistrationTest.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/branches/2.0.x-fixes/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/LocalServerRegistrationTest.java?rev=639697&r1=639696&r2=639697&view=diff
==============================================================================
---
incubator/cxf/branches/2.0.x-fixes/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/LocalServerRegistrationTest.java
(original)
+++
incubator/cxf/branches/2.0.x-fixes/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/LocalServerRegistrationTest.java
Fri Mar 21 09:21:36 2008
@@ -39,7 +39,6 @@
import org.apache.cxf.transport.ConduitInitiator;
import org.apache.cxf.transport.ConduitInitiatorManager;
import org.apache.cxf.transport.MessageObserver;
-import org.apache.cxf.transport.local.LocalConduit;
import org.apache.cxf.transport.local.LocalTransportFactory;
import org.junit.Test;
@@ -67,19 +66,20 @@
BindingInfo bi = serviceInfo.getBindings().iterator().next();
BindingOperationInfo bop = bi.getOperations().iterator().next();
- assertNotNull(bop.getOperationInfo());
-
+ assertNotNull(bop.getOperationInfo());
+
MessageImpl m = new MessageImpl();
m.setContent(List.class, content);
- m.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE);
- m.put(ObjectBinding.BINDING, bop.getBinding().getName());
- m.put(ObjectBinding.OPERATION, bop.getName());
-
ExchangeImpl ex = new ExchangeImpl();
ex.setInMessage(m);
+ ex.put(BindingOperationInfo.class, bop);
Conduit c = getLocalConduit("local://" + server);
ex.setConduit(c);
+
+ new ObjectDispatchOutInterceptor().handleMessage(m);
+
+
c.setMessageObserver(new MessageObserver() {
public void onMessage(Message message) {
Modified:
incubator/cxf/branches/2.0.x-fixes/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/ObjectBindingTest.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/branches/2.0.x-fixes/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/ObjectBindingTest.java?rev=639697&r1=639696&r2=639697&view=diff
==============================================================================
---
incubator/cxf/branches/2.0.x-fixes/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/ObjectBindingTest.java
(original)
+++
incubator/cxf/branches/2.0.x-fixes/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/ObjectBindingTest.java
Fri Mar 21 09:21:36 2008
@@ -67,16 +67,19 @@
assertNotNull(bop.getOperationInfo());
+
MessageImpl m = new MessageImpl();
m.setContent(List.class, content);
- m.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE);
- m.put(ObjectBinding.BINDING, bop.getBinding().getName());
- m.put(ObjectBinding.OPERATION, bop.getName());
-
ExchangeImpl ex = new ExchangeImpl();
ex.setInMessage(m);
+ ex.put(BindingOperationInfo.class, bop);
Conduit c = getLocalConduit("local://Echo");
+ ex.setConduit(c);
+
+ new ObjectDispatchOutInterceptor().handleMessage(m);
+
+
ex.setConduit(c);
c.setMessageObserver(new MessageObserver() {
Modified:
incubator/cxf/branches/2.0.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/branches/2.0.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java?rev=639697&r1=639696&r2=639697&view=diff
==============================================================================
---
incubator/cxf/branches/2.0.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
(original)
+++
incubator/cxf/branches/2.0.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
Fri Mar 21 09:21:36 2008
@@ -24,12 +24,10 @@
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
-import java.util.Map;
-import java.util.Set;
import java.util.logging.Logger;
import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.io.AbstractWrappedOutputStream;
import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.ExchangeImpl;
@@ -43,7 +41,7 @@
public static final String RESPONSE_CONDUIT = LocalConduit.class.getName()
+ ".inConduit";
public static final String IN_EXCHANGE = LocalConduit.class.getName() +
".inExchange";
public static final String DIRECT_DISPATCH = LocalConduit.class.getName()
+ ".directDispatch";
- public static final String MESSAGE_FILTER_PROPERTIES =
LocalConduit.class.getName() + ".filterProperties";
+ public static final String MESSAGE_FILTER_PROPERTIES =
LocalTransportFactory.MESSAGE_FILTER_PROPERTIES;
private static final Logger LOG =
LogUtils.getL7dLogger(LocalConduit.class);
@@ -86,7 +84,8 @@
copy.put(IN_CONDUIT, this);
copy.setDestination(destination);
- copy(message, copy, transportFactory.getMessageFilterProperties());
+ transportFactory.copy(message, copy);
+ MessageImpl.copyContent(message, copy);
CachedOutputStream stream =
(CachedOutputStream)message.getContent(OutputStream.class);
copy.setContent(InputStream.class, stream.getInputStream());
@@ -101,24 +100,8 @@
destination.getMessageObserver().onMessage(copy);
}
- public static void copy(Message message, MessageImpl copy, Set<String>
defaultFilter) {
- Set<String> filter =
CastUtils.cast((Set)message.get(MESSAGE_FILTER_PROPERTIES));
- if (filter == null) {
- filter = defaultFilter;
- }
-
- // copy all the contents
- for (Map.Entry<String, Object> e : message.entrySet()) {
- if (!filter.contains(e.getKey())) {
- copy.put(e.getKey(), e.getValue());
- }
- }
-
- MessageImpl.copyContent(message, copy);
- }
private void dispatchViaPipe(final Message message) throws IOException {
- final PipedInputStream stream = new PipedInputStream();
final LocalConduit conduit = this;
final Exchange exchange = message.getExchange();
@@ -127,24 +110,32 @@
+
destination.getAddress().getAddress().getValue());
}
- final Runnable receiver = new Runnable() {
- public void run() {
- MessageImpl inMsg = new MessageImpl();
- inMsg.setContent(InputStream.class, stream);
- inMsg.setDestination(destination);
- inMsg.put(IN_CONDUIT, conduit);
-
- ExchangeImpl ex = new ExchangeImpl();
- ex.setInMessage(inMsg);
- ex.put(IN_EXCHANGE, exchange);
- destination.getMessageObserver().onMessage(inMsg);
- }
- };
-
- message.setContent(OutputStream.class, new PipedOutputStream(stream));
-
- // TODO: put on executor
- new Thread(receiver).start();
+
+ AbstractWrappedOutputStream cout
+ = new AbstractWrappedOutputStream() {
+ protected void onFirstWrite() throws IOException {
+ final PipedInputStream stream = new PipedInputStream();
+ wrappedStream = new PipedOutputStream(stream);
+
+ final Runnable receiver = new Runnable() {
+ public void run() {
+ MessageImpl inMsg = new MessageImpl();
+ transportFactory.copy(message, inMsg);
+ inMsg.setContent(InputStream.class, stream);
+ inMsg.setDestination(destination);
+ inMsg.put(IN_CONDUIT, conduit);
+
+ ExchangeImpl ex = new ExchangeImpl();
+ ex.setInMessage(inMsg);
+ ex.put(IN_EXCHANGE, exchange);
+ destination.getMessageObserver().onMessage(inMsg);
+ }
+ };
+
+ new Thread(receiver).start();
+ }
+ };
+ message.setContent(OutputStream.class, cout);
}
protected Logger getLogger() {
Modified:
incubator/cxf/branches/2.0.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/branches/2.0.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java?rev=639697&r1=639696&r2=639697&view=diff
==============================================================================
---
incubator/cxf/branches/2.0.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java
(original)
+++
incubator/cxf/branches/2.0.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java
Fri Mar 21 09:21:36 2008
@@ -27,6 +27,7 @@
import java.util.logging.Logger;
import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.io.AbstractWrappedOutputStream;
import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
@@ -66,7 +67,7 @@
return null;
}
- static class SynchronousConduit extends AbstractConduit {
+ class SynchronousConduit extends AbstractConduit {
private LocalConduit conduit;
public SynchronousConduit(LocalConduit conduit) {
@@ -77,22 +78,31 @@
public void prepare(final Message message) throws IOException {
if
(!Boolean.TRUE.equals(message.getExchange().get(LocalConduit.DIRECT_DISPATCH)))
{
final Exchange exchange =
(Exchange)message.getExchange().get(LocalConduit.IN_EXCHANGE);
-
- final PipedInputStream stream = new PipedInputStream();
- final Runnable receiver = new Runnable() {
- public void run() {
- MessageImpl m = new MessageImpl();
- if (exchange != null) {
- exchange.setInMessage(m);
- }
- m.setContent(InputStream.class, stream);
- conduit.getMessageObserver().onMessage(m);
- }
- };
+
+ AbstractWrappedOutputStream cout
+ = new AbstractWrappedOutputStream() {
+ protected void onFirstWrite() throws IOException {
+ final PipedInputStream stream = new
PipedInputStream();
+ wrappedStream = new PipedOutputStream(stream);
- PipedOutputStream outStream = new PipedOutputStream(stream);
- message.setContent(OutputStream.class, outStream);
- new Thread(receiver).start();
+ final Runnable receiver = new Runnable() {
+ public void run() {
+ MessageImpl m = new MessageImpl();
+ localDestinationFactory.copy(message, m);
+
+ if (exchange != null) {
+ exchange.setInMessage(m);
+ }
+ m.setContent(InputStream.class, stream);
+ conduit.getMessageObserver().onMessage(m);
+ }
+ };
+
+ new Thread(receiver).start();
+ }
+ };
+
+ message.setContent(OutputStream.class, cout);
} else {
CachedOutputStream stream = new CachedOutputStream();
Modified:
incubator/cxf/branches/2.0.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/branches/2.0.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java?rev=639697&r1=639696&r2=639697&view=diff
==============================================================================
---
incubator/cxf/branches/2.0.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java
(original)
+++
incubator/cxf/branches/2.0.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java
Fri Mar 21 09:21:36 2008
@@ -32,6 +32,7 @@
import org.apache.cxf.Bus;
import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.message.Message;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.AbstractTransportFactory;
@@ -46,6 +47,10 @@
implements DestinationFactory, ConduitInitiator {
public static final String TRANSPORT_ID =
"http://cxf.apache.org/transports/local";
+ public static final String MESSAGE_FILTER_PROPERTIES
+ = LocalTransportFactory.class.getName() + ".filterProperties";
+ public static final String MESSAGE_INCLUDE_PROPERTIES
+ = LocalTransportFactory.class.getName() + ".includeProperties";
private static final Logger LOG =
LogUtils.getL7dLogger(LocalTransportFactory.class);
private static final Set<String> URI_PREFIXES = new HashSet<String>();
@@ -58,6 +63,7 @@
private Bus bus;
private Set<String> messageFilterProperties;
+ private Set<String> messageIncludeProperties;
public LocalTransportFactory() {
super();
@@ -66,7 +72,14 @@
setTransportIds(ids);
messageFilterProperties = new HashSet<String>();
- messageFilterProperties.add(Message.REQUESTOR_ROLE);
+ messageIncludeProperties = new HashSet<String>();
+ messageFilterProperties.add(Message.REQUESTOR_ROLE);
+
+ messageIncludeProperties.add(Message.PROTOCOL_HEADERS);
+ messageIncludeProperties.add(Message.ENCODING);
+ messageIncludeProperties.add(Message.CONTENT_TYPE);
+ messageIncludeProperties.add(Message.ACCEPT_CONTENT_TYPE);
+ messageIncludeProperties.add(Message.RESPONSE_CODE);
}
@Resource(name = "bus")
@@ -126,8 +139,36 @@
return messageFilterProperties;
}
- public void setMessageFilterProperties(Set<String>
messageFilterProperties) {
- this.messageFilterProperties = messageFilterProperties;
+ public void setMessageFilterProperties(Set<String> props) {
+ this.messageFilterProperties = props;
+ }
+ public Set<String> getIncludeMessageProperties() {
+ return messageIncludeProperties;
}
+ public void setMessageIncludeProperties(Set<String> props) {
+ this.messageIncludeProperties = props;
+ }
+
+
+ public void copy(Message message, Message copy) {
+ Set<String> filter =
CastUtils.cast((Set)message.get(MESSAGE_FILTER_PROPERTIES));
+ if (filter == null) {
+ filter = messageFilterProperties;
+ }
+
+ Set<String> includes =
CastUtils.cast((Set)message.get(MESSAGE_INCLUDE_PROPERTIES));
+ if (includes == null) {
+ includes = messageIncludeProperties;
+ }
+
+ // copy all the contents
+ for (Map.Entry<String, Object> e : message.entrySet()) {
+ if ((includes.contains(e.getKey())
+ || messageIncludeProperties.contains(e.getKey()))
+ && !filter.contains(e.getKey())) {
+ copy.put(e.getKey(), e.getValue());
+ }
+ }
+ }
}