Hi,
I have set up an example for routing from a jms endpoint to a cxf
endpoint using the camel transport in cxf.
To test the service I am using a normal cxf client on the jms queue.
When I run my test I get an exception. It seems the unmarshal method
tries to convert the incoming message to either TextMessage,
BytesMessage or ObjectMessage.
As the class ActiveMqMessage is none of these it does not work. Any idea
why this happens?
I have attached the exception, my log, the java code and the cxf config.
java.lang.ClassCastException:
org.apache.activemq.command.ActiveMQMessage cannot be cast to
javax.jms.ObjectMessage
at
org.apache.cxf.transport.jms.JMSTransportBase.unmarshal(JMSTransportBase.java:125)
at org.apache.cxf.transport.jms.JMSConduit.receive(JMSConduit.java:162)
at
org.apache.cxf.transport.jms.JMSConduit.access$100(JMSConduit.java:55)
at
org.apache.cxf.transport.jms.JMSConduit$JMSOutputStream.handleResponse(JMSConduit.java:366)
at
org.apache.cxf.transport.jms.JMSConduit$JMSOutputStream.doClose(JMSConduit.java:258)
at
org.apache.cxf.io.CachedOutputStream.close(CachedOutputStream.java:156)
at
org.apache.cxf.io.CacheAndWriteOutputStream.postClose(CacheAndWriteOutputStream.java:47)
at
org.apache.cxf.io.CachedOutputStream.close(CachedOutputStream.java:159)
at
org.apache.cxf.transport.AbstractConduit.close(AbstractConduit.java:66)
at
org.apache.cxf.interceptor.MessageSenderInterceptor$MessageSenderEndingInterceptor.handleMessage(MessageSenderInterceptor.java:62)
at
org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:221)
at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:276)
at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:222)
at org.apache.cxf.frontend.ClientProxy.invokeSync(ClientProxy.java:73)
at
org.apache.cxf.jaxws.JaxWsClientProxy.invoke(JaxWsClientProxy.java:177)
at $Proxy36.echo(Unknown Source)
at JmsToCXF.run(JmsToCXF.java:125)
at JmsToCXF.main(JmsToCXF.java:136)
-----
import java.util.Arrays;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.cxf.HelloService;
import org.apache.camel.component.cxf.HelloServiceImpl;
import org.apache.camel.component.cxf.transport.CamelTransportFactory;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
import org.apache.cxf.bus.spring.SpringBusFactory;
import org.apache.cxf.endpoint.ServerImpl;
import org.apache.cxf.interceptor.LoggingInInterceptor;
import org.apache.cxf.interceptor.LoggingOutInterceptor;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.apache.cxf.jaxws.JaxWsServerFactoryBean;
import org.apache.cxf.transport.ConduitInitiatorManager;
import org.apache.cxf.transport.DestinationFactoryManager;
/**
* An example class for demonstrating some of the basics behind camel This
* example will send some text messages on to a JMS Queue, consume them and
* persist them to disk
*
* @version $Revision: 529902 $
*/
public final class JmsToCXF {
private JmsToCXF() {
}
/**
* @return
* @throws Exception
*/
private CamelContext createCamel() throws Exception {
CamelContext context = new DefaultCamelContext();
ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
context.addComponent("test-jms",
JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
context.addRoutes(new RouteBuilder() {
public void configure() {
from("test-jms:queue:test")
.to("direct:test");
}
});
return context;
}
public void createCXFBus(CamelContext context) {
//setup the camel transport for the bus
Bus bus = new SpringBusFactory().createBus("cxf.xml");
CamelTransportFactory camelTransportFactory = new
CamelTransportFactory();
camelTransportFactory.setCamelContext(context);
camelTransportFactory.setBus(bus);
// register the conduit initiator
ConduitInitiatorManager cim =
bus.getExtension(ConduitInitiatorManager.class);
cim.registerConduitInitiator(CamelTransportFactory.TRANSPORT_ID,
camelTransportFactory);
// register the destination factory
DestinationFactoryManager dfm =
bus.getExtension(DestinationFactoryManager.class);
dfm.registerDestinationFactory(CamelTransportFactory.TRANSPORT_ID,
camelTransportFactory);
// set the default bus for
BusFactory.setDefaultBus(bus);
String[] trIds = new String[] {CamelTransportFactory.TRANSPORT_ID};
camelTransportFactory.setTransportIds(Arrays.asList(trIds));
}
public ServerImpl startService() {
JaxWsServerFactoryBean svrBean = new JaxWsServerFactoryBean();
svrBean.setAddress("camel://direct:test");
svrBean.setServiceClass(HelloService.class);
svrBean.setServiceBean(new HelloServiceImpl());
svrBean.getInInterceptors().add(new LoggingInInterceptor());
svrBean.getOutInterceptors().add(new LoggingOutInterceptor());
ServerImpl server = (ServerImpl)svrBean.create();
server.start();
return server;
}
/**
* Start a client that calls the service over hte jms queue to
* test the route
* @return
*/
public HelloService createClient() {
JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
factory.setServiceClass(HelloService.class);
factory.setAddress("jms://");
return (HelloService) factory.create();
}
public void run() throws Exception {
/**
* Start camel context with a jms component
*/
CamelContext context = createCamel();
/**
* Create the cxf bus and integrate it with camel
*/
createCXFBus(context);
ServerImpl server = startService();
HelloService client = createClient();
context.start();
String reply = client.echo("hallo");
context.stop();
server.stop();
//broker.stop();
System.exit(0);
}
public static void main(String args[]) throws Exception {
new JmsToCXF().run();
}
}
-------------------
<jms:conduit
name="{http://cxf.component.camel.apache.org/}HelloServicePort.jms-conduit">
<jms:clientConfig clientReceiveTimeout="11000"
messageTimeToLive="10000"/>
<jms:address destinationStyle="queue"
jndiConnectionFactoryName="ConnectionFactory"
jndiDestinationName="dynamicQueues/test"
connectionUserName="testUser"
connectionPassword="testPassword">
<jms:JMSNamingProperty name="java.naming.factory.initial"
value="org.apache.activemq.jndi.ActiveMQInitialContextFactory" />
<jms:JMSNamingProperty name="java.naming.provider.url"
value="tcp://localhost:61616" />
</jms:address>
</jms:conduit>
------------------------------
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>eu.cschneider</groupId>
<artifactId>cameltest</artifactId>
<version>1.0.0-SNAPSHOT</version>
<repositories>
<repository>
<id>apache.incubating.releases</id>
<name>Apache Incubating Release Distribution Repository</name>
<url>http://people.apache.org/repo/m2-incubating-repository</url>
</repository>
</repositories>
<properties>
<spring.version>2.5.4</spring.version>
<cxf.version>2.1</cxf.version>
<camel-version>1.3.0</camel-version>
</properties>
<dependencies>
<!-- Spring is directly included to override the version 2.0.4
cxf brings -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.14</version>
</dependency>
<!-- Depending on your requirements you may need more or less
modules from cxf. To make the start easier there are
some more listed than you need for this example. -->
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-core</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-frontend-simple</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-frontend-jaxws</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-databinding-aegis</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-local</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http-jetty</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-jms</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-management</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-common-utilities</artifactId>
<version>${cxf.version}</version>
</dependency>
<!--
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>6.1.5</version>
</dependency>
-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.tibco</groupId>
<artifactId>tibjms</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>${camel-version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jms</artifactId>
<version>${camel-version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-cxf</artifactId>
<version>${camel-version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-core</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-frontend-simple</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-frontend-jaxws</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-databinding-aegis</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-local</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http-jetty</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-jms</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-management</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-common-utilities</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-tools-common</artifactId>
<version>${cxf.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.5</source>
<target>1.5</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Willem Jiang schrieb:
Hi Christian,
The default exchange pattern of camel-cxf component is InOut, but for
the camel-jms consumer it just receive message from message queue.
If you want send the response message back , you have to put the
message to the response queue, just like this
from("jms:myqueue").choice()
.when(xpath("//namespace=service1namespace")).to("cxf:mybeanname:myEndpointName").to("jms:responsequeue")
.when(xpath("//namespace=service2namespace")).to("cxf:mybeanname2:myEndpointName2").to("jms:responsequeue")
.otherwise().to("jms:deadletterqueue")
BTW, If you alway want to let CXF take care of the service publishing
work, you could use camel transport which will be more effective than
the local transport.
Here are the example[1] and document[2]
[1]http://cwiki.apache.org/CAMEL/cxf-example.html
[2]http://cwiki.apache.org/CAMEL/camel-transport-for-cxf.html
Willem
Christian Schneider wrote: