Author: ningjiang
Date: Wed Oct 10 23:39:41 2007
New Revision: 583696
URL: http://svn.apache.org/viewvc?rev=583696&view=rev
Log:
CXF-1107 Fixed LocalTransport DirectDispatch NPE issue
Added:
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/local_transport/
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/local_transport/DirectDispatchClientTest.java
(with props)
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/local_transport/Server.java
(with props)
Modified:
incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java
incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java
incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java
Modified:
incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java?rev=583696&r1=583695&r2=583696&view=diff
==============================================================================
---
incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
(original)
+++
incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
Wed Oct 10 23:39:41 2007
@@ -58,6 +58,11 @@
public void prepare(final Message message) throws IOException {
if (!Boolean.TRUE.equals(message.get(DIRECT_DISPATCH))) {
dispatchViaPipe(message);
+ } else {
+ // prepare the stream here
+ PipedInputStream stream = new PipedInputStream();
+ message.setContent(InputStream.class, stream);
+ message.setContent(OutputStream.class, new
PipedOutputStream(stream));
}
}
@@ -87,6 +92,7 @@
ExchangeImpl ex = new ExchangeImpl();
ex.setInMessage(copy);
ex.put(IN_EXCHANGE, message.getExchange());
+ ex.put(LocalConduit.DIRECT_DISPATCH, true);
ex.setDestination(destination);
destination.getMessageObserver().onMessage(copy);
Modified:
incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java?rev=583696&r1=583695&r2=583696&view=diff
==============================================================================
---
incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java
(original)
+++
incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java
Wed Oct 10 23:39:41 2007
@@ -73,8 +73,8 @@
this.conduit = conduit;
}
- public void prepare(final Message message) throws IOException {
- if
(!Boolean.TRUE.equals(message.get(LocalConduit.DIRECT_DISPATCH))) {
+ public void prepare(final Message message) throws IOException {
+ if
(!Boolean.TRUE.equals(message.getExchange().get(LocalConduit.DIRECT_DISPATCH)))
{
final Exchange exchange =
(Exchange)message.getExchange().get(LocalConduit.IN_EXCHANGE);
final PipedInputStream stream = new PipedInputStream();
@@ -90,24 +90,27 @@
};
PipedOutputStream outStream = new PipedOutputStream(stream);
- message.setContent(OutputStream.class, outStream);
-
+ message.setContent(OutputStream.class, outStream);
new Thread(receiver).start();
+
+ } else {
+ PipedInputStream stream = new PipedInputStream();
+ message.setContent(InputStream.class, stream);
+ message.setContent(OutputStream.class, new
PipedOutputStream(stream));
}
}
@Override
public void close(Message message) throws IOException {
- if
(Boolean.TRUE.equals(message.get(LocalConduit.DIRECT_DISPATCH))) {
+ if
(Boolean.TRUE.equals(message.getExchange().get(LocalConduit.DIRECT_DISPATCH))) {
final Exchange exchange =
(Exchange)message.getExchange().get(LocalConduit.IN_EXCHANGE);
MessageImpl copy = new MessageImpl();
copy.putAll(message);
MessageImpl.copyContent(message, copy);
- if (exchange.getInMessage() == null) {
+ if (exchange != null && exchange.getInMessage() == null) {
exchange.setInMessage(copy);
- }
-
+ }
conduit.getMessageObserver().onMessage(copy);
return;
}
Modified:
incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java?rev=583696&r1=583695&r2=583696&view=diff
==============================================================================
---
incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java
(original)
+++
incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java
Wed Oct 10 23:39:41 2007
@@ -66,7 +66,7 @@
setTransportIds(ids);
messageFilterProperties = new HashSet<String>();
- messageFilterProperties.add(Message.REQUESTOR_ROLE);
+ messageFilterProperties.add(Message.REQUESTOR_ROLE);
}
@Resource(name = "bus")
Modified:
incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java?rev=583696&r1=583695&r2=583696&view=diff
==============================================================================
---
incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java
(original)
+++
incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java
Wed Oct 10 23:39:41 2007
@@ -19,18 +19,17 @@
package org.apache.cxf.transport.local;
-import java.io.ByteArrayInputStream;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.Conduit;
-import org.apache.cxf.transport.Destination;
+
import org.apache.cxf.transport.MessageObserver;
import org.junit.Assert;
import org.junit.Test;
@@ -38,35 +37,16 @@
public class LocalTransportFactoryTest extends Assert {
@Test
- public void testTransportFactory() throws Exception {
- LocalTransportFactory factory = new LocalTransportFactory();
-
- EndpointInfo ei = new EndpointInfo(null,
"http://schemas.xmlsoap.org/soap/http");
- AddressType a = new AddressType();
- a.setLocation("http://localhost/test");
- ei.addExtensor(a);
-
- Destination d = factory.getDestination(ei);
- d.setMessageObserver(new EchoObserver());
-
-
- Conduit conduit = factory.getConduit(ei);
- TestMessageObserver obs = new TestMessageObserver();
- conduit.setMessageObserver(obs);
-
- Message m = new MessageImpl();
- conduit.prepare(m);
-
- OutputStream out = m.getContent(OutputStream.class);
- out.write("hello".getBytes());
- out.close();
-
-
- assertEquals("hello", obs.getResponseStream().toString());
+ public void testLocalTransportWithSeperateThread() throws Exception {
+ testInvocation(false);
}
@Test
- public void testDirectInvocation() throws Exception {
+ public void testLocalTranpsortWithDirectDispatch() throws Exception {
+ testInvocation(true);
+ }
+
+ private void testInvocation(boolean isDirectDispatch) throws Exception {
LocalTransportFactory factory = new LocalTransportFactory();
EndpointInfo ei = new EndpointInfo(null,
"http://schemas.xmlsoap.org/soap/http");
@@ -83,10 +63,15 @@
conduit.setMessageObserver(obs);
MessageImpl m = new MessageImpl();
- m.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE);
+ if (isDirectDispatch) {
+ m.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE);
+ }
m.setDestination(d);
- m.setContent(InputStream.class, new
ByteArrayInputStream("hello".getBytes()));
conduit.prepare(m);
+
+ OutputStream out = m.getContent(OutputStream.class);
+ out.write("hello".getBytes());
+ out.close();
conduit.close(m);
assertEquals("hello", obs.getResponseStream().toString());
@@ -95,20 +80,19 @@
public void onMessage(Message message) {
try {
+ message.getExchange().setInMessage(message);
Conduit backChannel =
message.getDestination().getBackChannel(message, null, null);
- message.remove(LocalConduit.DIRECT_DISPATCH);
+ InputStream in = message.getContent(InputStream.class);
+ assertNotNull(in);
backChannel.prepare(message);
-
OutputStream out = message.getContent(OutputStream.class);
- assertNotNull(out);
- InputStream in = message.getContent(InputStream.class);
- assertNotNull(in);
-
+ assertNotNull(out);
copy(in, out, 1024);
-
out.close();
- in.close();
+ in.close();
+ backChannel.close(message);
+
} catch (Exception e) {
e.printStackTrace();
}
@@ -135,6 +119,7 @@
class TestMessageObserver implements MessageObserver {
ByteArrayOutputStream response = new ByteArrayOutputStream();
boolean written;
+ Message inMessage;
public synchronized ByteArrayOutputStream getResponseStream() throws
Exception {
if (!written) {
@@ -148,6 +133,7 @@
try {
message.remove(LocalConduit.DIRECT_DISPATCH);
copy(message.getContent(InputStream.class), response, 1024);
+ inMessage = message;
} catch (IOException e) {
e.printStackTrace();
fail();
Added:
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/local_transport/DirectDispatchClientTest.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/local_transport/DirectDispatchClientTest.java?rev=583696&view=auto
==============================================================================
---
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/local_transport/DirectDispatchClientTest.java
(added)
+++
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/local_transport/DirectDispatchClientTest.java
Wed Oct 10 23:39:41 2007
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.local_transport;
+
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+
+import javax.xml.namespace.QName;
+import javax.xml.ws.BindingProvider;
+import javax.xml.ws.Service;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.binding.soap.SoapTransportFactory;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.transport.ConduitInitiatorManager;
+import org.apache.cxf.transport.DestinationFactoryManager;
+import org.apache.cxf.transport.local.LocalConduit;
+import org.apache.cxf.transport.local.LocalTransportFactory;
+import org.apache.hello_world_soap_http.Greeter;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class DirectDispatchClientTest extends AbstractBusClientServerTestBase {
+ private static Bus staticBus;
+ private final QName serviceName = new
QName("http://apache.org/hello_world_soap_http",
+ "SOAPService");
+ private final QName localPortName = new
QName("http://apache.org/hello_world_soap_http",
+ "localPortName");
+
+
+ @BeforeClass
+ public static void startServers() throws Exception {
+ staticBus = BusFactory.getDefaultBus();
+ setupLocalTransport(staticBus);
+ BusFactory.setThreadDefaultBus(staticBus);
+ assertTrue("server did not launch correctly",
launchServer(Server.class, true));
+ }
+
+ @Test
+ public void testDirectDispatch() {
+ invokeService(true);
+ }
+
+ @Test
+ public void testPipeLineDispatch() {
+ invokeService(false);
+ }
+
+ public static void setupLocalTransport(Bus bus) {
+ DestinationFactoryManager dfm =
bus.getExtension(DestinationFactoryManager.class);
+
+ SoapTransportFactory soapDF = new SoapTransportFactory();
+ soapDF.setBus(bus);
+
dfm.registerDestinationFactory("http://schemas.xmlsoap.org/wsdl/soap/", soapDF);
+ dfm.registerDestinationFactory("http://schemas.xmlsoap.org/soap/",
soapDF);
+
dfm.registerDestinationFactory("http://cxf.apache.org/transports/local",
soapDF);
+
+ LocalTransportFactory localTransport = new LocalTransportFactory();
+ dfm.registerDestinationFactory("http://schemas.xmlsoap.org/soap/http",
localTransport);
+
dfm.registerDestinationFactory("http://schemas.xmlsoap.org/wsdl/soap/http",
localTransport);
+
dfm.registerDestinationFactory("http://cxf.apache.org/bindings/xformat",
localTransport);
+
dfm.registerDestinationFactory("http://cxf.apache.org/transports/local",
localTransport);
+
+ ConduitInitiatorManager extension =
bus.getExtension(ConduitInitiatorManager.class);
+ extension.registerConduitInitiator(LocalTransportFactory.TRANSPORT_ID,
localTransport);
+
extension.registerConduitInitiator("http://schemas.xmlsoap.org/wsdl/soap/",
localTransport);
+
extension.registerConduitInitiator("http://schemas.xmlsoap.org/soap/http",
localTransport);
+ extension.registerConduitInitiator("http://schemas.xmlsoap.org/soap/",
localTransport);
+ }
+
+ private void invokeService(boolean isDirectDispatch) {
+ BusFactory.setThreadDefaultBus(staticBus);
+ Service service = Service.create(serviceName);
+ service.addPort(localPortName, "http://schemas.xmlsoap.org/soap/",
+ "local://Greeter");
+ Greeter greeter = service.getPort(localPortName, Greeter.class);
+
+ if (isDirectDispatch) {
+ InvocationHandler handler = Proxy.getInvocationHandler(greeter);
+ BindingProvider bp = null;
+ if (handler instanceof BindingProvider) {
+ bp = (BindingProvider)handler;
+ Map<String, Object> requestContext = bp.getRequestContext();
+ requestContext.put(LocalConduit.DIRECT_DISPATCH, true);
+
+ }
+ }
+
+ String reply = greeter.greetMe("test");
+ assertEquals("Hello test", reply);
+ reply = greeter.sayHi();
+ assertNotNull("no response received from service", reply);
+ assertEquals("Bonjour", reply);
+
+ }
+
+
+
+}
Propchange:
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/local_transport/DirectDispatchClientTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/local_transport/DirectDispatchClientTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/local_transport/Server.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/local_transport/Server.java?rev=583696&view=auto
==============================================================================
---
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/local_transport/Server.java
(added)
+++
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/local_transport/Server.java
Wed Oct 10 23:39:41 2007
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.local_transport;
+
+import javax.xml.ws.Endpoint;
+
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+import org.apache.hello_world_soap_http.GreeterImpl;
+
+public class Server extends AbstractBusTestServerBase {
+
+ protected void run() {
+ Object implementor = new GreeterImpl();
+ String address = "local://Greeter";
+ Endpoint.publish(address, implementor);
+ }
+
+
+ public static void main(String[] args) {
+ try {
+ Server s = new Server();
+ s.start();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ System.exit(-1);
+ } finally {
+ System.out.println("done!");
+ }
+ }
+}
+
Propchange:
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/local_transport/Server.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/local_transport/Server.java
------------------------------------------------------------------------------
svn:keywords = Rev Date