Hi,
I am using fuse-esb-3.3.1.3 and fuse-message-broker-5.0.0.17.
My requirrement is to build a cxf-se SU which has wsdl and implementation
class.I have servicemix-jms SU which acts as consumer and picks up the
message from queue.
SOAp message is converted to JMS Message and placed it an queue by a client
program. This message should be picked by servicemix-jms and it should send
it to cxf-se for further processing.
I have service engine service unit which follows WSDL first approach .
After compilation and deployment of the project , when i try to place in the
message in Queue using client program i am gettting following error.
It would be very helpful if I get guidance to solve this.
My Client program to place JMS Message is
package com.test;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Marshaller;
import javax.xml.soap.MessageFactory;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.xml.soap.SOAPBody;
import javax.xml.soap.SOAPEnvelope;
import javax.xml.soap.SOAPMessage;
import javax.xml.soap.SOAPPart;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IndentPrinter;
import com.sun.messaging.xml.MessageTransformer;
import com.cvs.eph.entities._1_0.*;
import com.cvs.eph.messages._1_0.*;
public class Producer {
private Destination destination;
// private int messageCount = 10;
private long sleepTime;
//private boolean verbose = true;
private int messageSize = 255;
private long timeToLive;
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
//private String subject = "TOOL1.DEFAULT";
private String subject = "queues/EPHOngoingMaintenanceInput";
private boolean topic;
private boolean transacted;
private boolean persistent;
public static void main(String[] args) {
Producer producerTool = new Producer();
producerTool.run();
}
public void run() {
Connection connection = null;
try {
System.out.println("Connecting to URL: " + url);
System.out.println("Publishing a Message with size " +
messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
System.out.println("Using " + (persistent ? "persistent" :
"non-persistent") + " messages");
System.out.println("Sleeping between publish " + sleepTime + "
ms");
if (timeToLive != 0) {
System.out.println("Messages time to live " + timeToLive + "
ms");
}
// Create the connection.
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(user, password, url);
// ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://blrkec38454d.ad.infosys.com:61616");
connection = connectionFactory.createConnection();
connection.start();
// Create the session
Session session = connection.createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
if (topic) {
destination = session.createTopic(subject);
} else {
destination = session.createQueue(subject);
}
// Create the producer.
MessageProducer producer = session.createProducer(destination);
if (persistent) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
if (timeToLive != 0) {
producer.setTimeToLive(timeToLive);
}
// Start sending messages
sendLoop(session, producer);
System.out.println("Done.");
// Use the ActiveMQConnection interface to dump the connection
// stats.
ActiveMQConnection c = (ActiveMQConnection)connection;
c.getConnectionStats().dump(new IndentPrinter());
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
} finally {
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
protected void sendLoop(Session session, MessageProducer producer)
throws Exception {
//for (int i = 0; i < messageCount || messageCount == 0; i++) {
try{
System.out.println("in send loop");
OngoingMaintenance o = new OngoingMaintenance();
o.setApplicationID("OMA");
o.setActionCode("A");
o.setMaintenanceRequestID("9");
/* HealthCareCustomer h = new HealthCareCustomer();
HCC hcc = new HCC();
MPH mph = new MPH();
String[] pharray= {"111111","12345"};
for(int i=0; i<pharray.length; i++){
MobPhone mphone = new MobPhone();
mphone.setMobilePhone(BigInteger.valueOf(Integer.parseInt(pharray[i])));
mph.getMobPhone().add(mphone);
}
//MobPhone mphone2 = new MobPhone();
h.setMPH(mph);
hcc.setFirstName("Infosys");
//hcc.setEmail("[EMAIL PROTECTED]");
hcc.setLastName("Balasubramanian");
hcc.setMultBirthInd("Y");
hcc.setGender("F");
hcc.setPetInd(null);
h.setHCC(hcc);
o.setHealthCareCustomer(h);*/
/*construct a default soap MessageFactory */
MessageFactory mf = MessageFactory.newInstance();
/* Create a SOAP message object.*/
SOAPMessage soapMessage = mf.createMessage();
SOAPPart soapPart = soapMessage.getSOAPPart();
SOAPEnvelope envelope = soapPart.getEnvelope();
SOAPBody soapBody = (SOAPBody)envelope.getBody();
soapMessage.setProperty(javax.xml.soap.SOAPMessage.CHARACTER_SET_ENCODING,
"utf-8");
JAXBContext jc1 = JAXBContext.newInstance(
"com.cvs.eph.messages._1_0" );
Marshaller marshaller = jc1.createMarshaller();
marshaller.setProperty( Marshaller.JAXB_FORMATTED_OUTPUT,
Boolean.TRUE );
marshaller.marshal( o, System.out );
marshaller.marshal( o,soapBody);
soapMessage.saveChanges();
soapMessage.writeTo(System.out);
Message m =
MessageTransformer.SOAPMessageIntoJMSMessage(soapMessage, session );
producer.send(m);
if (transacted) {
session.commit();
}
}catch(Exception e){
System.out.println("exception-->"+e);
e.printStackTrace();
}
}
public void setPersistent(boolean durable) {
this.persistent = durable;
}
/*public void setMessageCount(int messageCount) {
this.messageCount = messageCount;
}
public void setVerbose(boolean verbose) {
this.verbose = verbose;
}*/
public void setMessageSize(int messageSize) {
this.messageSize = messageSize;
}
public void setPassword(String pwd) {
this.password = pwd;
}
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
public void setSubject(String subject) {
this.subject = subject;
}
public void setTimeToLive(long timeToLive) {
this.timeToLive = timeToLive;
}
public void setTopic(boolean topic) {
this.topic = topic;
}
public void setQueue(boolean queue) {
this.topic = !queue;
}
public void setTransacted(boolean transacted) {
this.transacted = transacted;
}
public void setUrl(String url) {
this.url = url;
}
public void setUser(String user) {
this.user = user;
}
}
It places messages in the following way
<SOAP-ENV:Envelope
xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/"><SOAP-ENV:Header/><SOAP-ENV:Body><ns2:OngoingMaintenance
xmlns:ns2="http://www.cvs.com/eph/messages/1_0"
xmlns:ns3="http://www.cvs.com/eph/entities/1_0"><ns2:MaintenanceRequestID>9</ns2:MaintenanceRequestID><ns2:ApplicationID>OMA</ns2:ApplicationID><ns2:ActionCode>A</ns2:ActionCode></ns2:OngoingMaintenance></SOAP-ENV:Body></SOAP-ENV:Envelope>
I have service engine SU(cxf-se) and the xbean entry is
<cxfse:endpoint>
<cxfse:pojo>
<bean class="com.cvs.eph.ongngmaint.OngoingMaintServiceImpl" />
</cxfse:pojo>
</cxfse:endpoint>
I Have added follwoing dependencies in pom file of SU
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.servicemix</groupId>
<artifactId>servicemix-core</artifactId>
<version>${servicemix-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.servicemix</groupId>
<artifactId>servicemix-cxf-se</artifactId>
<version>${servicemix-version}</version>
</dependency>
<dependency>
<groupId>org.apache.servicemix</groupId>
<artifactId>servicemix-jbi</artifactId>
<version>${servicemix-version}</version>
<scope>provided</scope>
</dependency>
<!-- Dependencies added for SOAP -->
<dependency>
<groupId>saaj-api</groupId>
<artifactId>saaj-api</artifactId>
<version>2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>saaj-impl</groupId>
<artifactId>saaj-impl</artifactId>
<version>2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
<version>1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>1.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>imq</groupId>
<artifactId>imq</artifactId>
<version>2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>imqxm</groupId>
<artifactId>imqxm</artifactId>
<version>2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>jaxm-api</groupId>
<artifactId>jaxm-api</artifactId>
<version>2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>jaxws-api</groupId>
<artifactId>jaxws-api</artifactId>
<version>2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>jaxb</groupId>
<artifactId>jaxb-xjc</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>jaxb</groupId>
<artifactId>jsr181-api</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>jaxb</groupId>
<artifactId>jsr173_api</artifactId>
<version>2.1</version>
</dependency>
My wsdl which i have placed under src/main/resource of servicemix-jms Su and
cxf-se SU.
<?xml version="1.0" encoding="UTF-8"?>
<wsdl:definitions xmlns:eph1_0="http://www.cvs.com/eph/wsdl/1_0"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/"
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:jms="http://cxf.apache.org/transports/jms"
name="EPHOngoingMaintenanceInOnlyService"
targetNamespace="http://www.cvs.com/eph/wsdl/1_0"
xmlns:msg="http://www.cvs.com/eph/messages/1_0">
<wsdl:types>
<xsd:schema
targetNamespace="http://www.cvs.com/eph/messages/1_0">
<xsd:include
schemaLocation="messages/EPHCommonTypes.xsd"/>
<xsd:include
schemaLocation="messages/OngoingMaintenance.xsd"/>
</xsd:schema>
</wsdl:types>
<wsdl:message name="Acknowledgement">
<wsdl:part name="ack" element="msg:Acknowledgement"/>
</wsdl:message>
<wsdl:message name="OperationFault">
<wsdl:part name="fault" element="msg:OperationFault"/>
</wsdl:message>
<wsdl:message name="DuplicateFound">
<wsdl:part name="fault" element="msg:DuplicateFound"/>
</wsdl:message>
<wsdl:message name="OngoingMaintenance">
<wsdl:part name="request"
element="msg:OngoingMaintenance"/>
</wsdl:message>
<wsdl:portType name="EPHOngoingMaintenanceInOnlyPort">
<wsdl:operation name="OngoingMaintenance">
<wsdl:input
message="eph1_0:OngoingMaintenance"/>
</wsdl:operation>
</wsdl:portType>
<wsdl:binding name="EPHOngoingmaintenaneBinding"
type="eph1_0:EPHOngoingMaintenanceInOnlyPort">
<soap:binding style="document"
transport="http://schemas.xmlsoap.org/soap/jms"/>
<wsdl:operation name="OngoingMaintenance">
<wsdl:input>
<soap:body use="literal"/>
</wsdl:input>
</wsdl:operation>
</wsdl:binding>
<wsdl:service name="EPHOngoingMaintenanceJMSService">
<wsdl:port name="EPHOngoingMaintenanceInOnlyPort"
binding="eph1_0:EPHOngoingmaintenaneBinding">
<jms:address
jndiConnectionFactoryName="ConnectionFactory"
jndiDestinationName="queues/EPHOngoingMaintenanceInput">
<jms:JMSNamingProperty
name="java.naming.factory.initial"
value="org.activemq.jndi.ActiveMQInitialContextFactory"/>
<jms:JMSNamingProperty
name="java.naming.provider.url"
value="tcp://localhost:61616"/>
</jms:address>
</wsdl:port>
</wsdl:service>
</wsdl:definitions>
Implementation class.. under src/main/java
package com.cvs.eph.ongngmaint;
import com.cvs.eph.*;
import com.cvs.eph.messages._1_0.OngoingMaintenance;
import com.cvs.eph.wsdl._1_0.EPHOngoingMaintenanceInOnlyPort;
import javax.jws.WebService;
@WebService(targetNamespace ="http://www.cvs.com/eph/wsdl/1_0",
serviceName="EPHOngoingMaintenanceJMSService",endpointInterface="com.cvs.eph.wsdl._1_0.EPHOngoingMaintenanceInOnlyPort")
public class OngoingMaintServiceImpl implements
EPHOngoingMaintenanceInOnlyPort{
public void ongoingMaintenance(OngoingMaintenance objOngoingMaintenance){
System.out.println("inside Ongoing Maintenace IMPL--");
System.out.println(" Ongoing Maintenance
object---"+objOngoingMaintenance);
System.out.println(""+objOngoingMaintenance.getApplicationID());
}
}
xbean.xml entry for servicemix-jms
<beans xmlns:jms="http://servicemix.apache.org/jms/1.0"
xmlns:om="http://www.cvs.com/eph/wsdl/1_0"
xmlns:amq="http://activemq.org/config/1.0"
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://servicemix.apache.org/jms/1.0
http://servicemix.apache.org/schema/servicemix-jms-3.2.2.xsd
http://activemq.org/config/1.0
http://activemq.apache.org/schema/core/activemq-core-4.1.1.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<jms:endpoint service="om:EPHOngoingMaintenanceJMSService"
endpoint="jms+soap"
targetInterfaceName="om:EPHOngoingMaintenanceInOnlyPort"
role="consumer"
destinationStyle="queue"
jmsProviderDestinationName="queues/EPHOngoingMaintenanceInput"
soap="true"
defaultMep="http://www.w3.org/2004/08/wsdl/in-only"
connectionFactory="#connectionFactory"
wsdlResource="classpath:EPHOngoingMaintenance.wsdl" />
<bean id="connectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</beans>
I am getting following error when i place message in queue......
ERROR - MultiplexingConsumerProcessor - Error while handling jms message
com.ctc.wstx.exc.WstxIOException: Invalid UTF-8 middle byte 0xed (at char
#3, by
te #-1)
at
com.ctc.wstx.sr.StreamScanner.throwFromIOE(StreamScanner.java:683)
at
com.ctc.wstx.sr.BasicStreamReader.next(BasicStreamReader.java:1086)
at
javax.xml.stream.util.StreamReaderDelegate.next(StreamReaderDelegate.
java:59)
at
org.apache.servicemix.jbi.jaxp.ExtendedXMLStreamReader.next(ExtendedX
MLStreamReader.java:61)
at
org.apache.servicemix.jbi.jaxp.ExtendedXMLStreamReader.nextTag(Extend
edXMLStreamReader.java:44)
at
org.apache.servicemix.soap.marshalers.SoapReader.readSoapUsingStax(So
apReader.java:164)
at
org.apache.servicemix.soap.marshalers.SoapReader.read(SoapReader.java
:89)
at
org.apache.servicemix.soap.marshalers.SoapReader.read(SoapReader.java
:76)
at
org.apache.servicemix.jms.DefaultJmsMarshaler.toSOAP(DefaultJmsMarsha
ler.java:179)
at
org.apache.servicemix.jms.AbstractJmsProcessor.toNMS(AbstractJmsProce
ssor.java:168)
at
org.apache.servicemix.jms.multiplexing.MultiplexingConsumerProcessor.
access$300(MultiplexingConsumerProcessor.java:38)
at
org.apache.servicemix.jms.multiplexing.MultiplexingConsumerProcessor$
1.run(MultiplexingConsumerProcessor.java:89)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
utor.java:650)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:675)
at java.lang.Thread.run(Thread.java:595)
Caused by: java.io.CharConversionException: Invalid UTF-8 middle byte 0xed
(at c
har #3, byte #-1)
at
com.ctc.wstx.io.UTF8Reader.reportInvalidOther(UTF8Reader.java:310)
at com.ctc.wstx.io.UTF8Reader.read(UTF8Reader.java:208)
at com.ctc.wstx.io.ReaderSource.readInto(ReaderSource.java:84)
at
com.ctc.wstx.io.BranchingReaderSource.readInto(BranchingReaderSource.
java:57)
at com.ctc.wstx.sr.StreamScanner.loadMore(StreamScanner.java:967)
at com.ctc.wstx.sr.StreamScanner.getNext(StreamScanner.java:738)
at
com.ctc.wstx.sr.BasicStreamReader.nextFromProlog(BasicStreamReader.ja
va:1995)
at
com.ctc.wstx.sr.BasicStreamReader.next(BasicStreamReader.java:1069)
... 13 more
Pls help me to resolve this.
Jayasree.B
--
View this message in context:
http://www.nabble.com/Urgent--MultiplexingConsumerProcessor-Error-while-handling-jms-message-tp20299928p20299928.html
Sent from the ServiceMix - User mailing list archive at Nabble.com.