Modified: webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java?rev=730924&r1=730923&r2=730924&view=diff ============================================================================== --- webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java (original) +++ webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java Fri Jan 2 22:33:39 2009 @@ -15,81 +15,85 @@ */ package org.apache.axis2.transport.jms; +import org.apache.axis2.AxisFault; import org.apache.axis2.Constants; -import org.apache.axis2.transport.base.threads.WorkerPool; import org.apache.axis2.transport.base.BaseUtils; import org.apache.axis2.transport.base.BaseConstants; import org.apache.axis2.transport.base.MetricsCollector; +import org.apache.axis2.transport.jms.ctype.ContentTypeInfo; import org.apache.axis2.description.Parameter; import org.apache.axis2.description.AxisService; import org.apache.axis2.description.AxisOperation; -import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.context.MessageContext; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import javax.jms.*; import javax.xml.namespace.QName; +import javax.transaction.UserTransaction; /** - * This is the actual receiver which listens for and accepts JMS messages, and - * hands them over to be processed by a worker thread. An instance of this - * class is created for each JMSConnectionFactory, but all instances may and - * will share the same worker thread pool held by the JMSListener + * This is the JMS message receiver which is invoked when a message is received. This processes + * the message through the engine */ -public class JMSMessageReceiver implements MessageListener { +public class JMSMessageReceiver { private static final Log log = LogFactory.getLog(JMSMessageReceiver.class); /** The JMSListener */ private JMSListener jmsListener = null; - /** The thread pool of workers */ - private WorkerPool workerPool = null; - /** The Axis configuration context */ - private ConfigurationContext cfgCtx = null; - /** A reference to the JMS Connection Factory to which this applies */ + /** A reference to the JMS Connection Factory */ private JMSConnectionFactory jmsConnectionFactory = null; - /** The name of the service this message receiver is bound to. */ - final String serviceName; - /** Metrics collector */ + /** The JMS metrics collector */ private MetricsCollector metrics = null; + /** The endpoint this message receiver is bound to */ + final JMSEndpoint endpoint; /** * Create a new JMSMessage receiver * * @param jmsListener the JMS transport Listener - * @param jmsConFac the JMS connection factory we are associated with - * @param workerPool the worker thread pool to be used - * @param cfgCtx the axis ConfigurationContext + * @param jmsConFac the JMS connection factory we are associated with + * @param workerPool the worker thread pool to be used + * @param cfgCtx the axis ConfigurationContext * @param serviceName the name of the Axis service + * @param endpoint the JMSEndpoint definition to be used */ - JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac, - WorkerPool workerPool, ConfigurationContext cfgCtx, String serviceName) { + JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac, JMSEndpoint endpoint) { this.jmsListener = jmsListener; this.jmsConnectionFactory = jmsConFac; - this.workerPool = workerPool; - this.cfgCtx = cfgCtx; - this.serviceName = serviceName; + this.endpoint = endpoint; this.metrics = jmsListener.getMetricsCollector(); } /** - * The entry point on the reception of each JMS message + * Process a new message received * * @param message the JMS message received + * @param ut UserTransaction which was used to receive the message + * @return true if caller should commit */ - public void onMessage(Message message) { - // directly create a new worker and delegate processing + public boolean onMessage(Message message, UserTransaction ut) { + try { if (log.isDebugEnabled()) { StringBuffer sb = new StringBuffer(); - sb.append("Received JMS message to destination : " + message.getJMSDestination()); - sb.append("\nMessage ID : " + message.getJMSMessageID()); - sb.append("\nCorrelation ID : " + message.getJMSCorrelationID()); - sb.append("\nReplyTo ID : " + message.getJMSReplyTo()); + sb.append("Received new JMS message for service :").append(endpoint.getServiceName()); + sb.append("\nDestination : ").append(message.getJMSDestination()); + sb.append("\nMessage ID : ").append(message.getJMSMessageID()); + sb.append("\nCorrelation ID : ").append(message.getJMSCorrelationID()); + sb.append("\nReplyTo : ").append(message.getJMSReplyTo()); + sb.append("\nRedelivery ? : ").append(message.getJMSRedelivered()); + sb.append("\nPriority : ").append(message.getJMSPriority()); + sb.append("\nExpiration : ").append(message.getJMSExpiration()); + sb.append("\nTimestamp : ").append(message.getJMSTimestamp()); + sb.append("\nMessage Type : ").append(message.getJMSType()); + sb.append("\nPersistent ? : ").append( + DeliveryMode.PERSISTENT == message.getJMSDeliveryMode()); + log.debug(sb.toString()); if (log.isTraceEnabled() && message instanceof TextMessage) { - log.trace("\nMessage : " + ((TextMessage) message).getText()); + log.trace("\nMessage : " + ((TextMessage) message).getText()); } } } catch (JMSException e) { @@ -100,134 +104,130 @@ // update transport level metrics try { - if (message instanceof BytesMessage) { - metrics.incrementBytesReceived((JMSUtils.getBodyLength((BytesMessage) message))); - } else if (message instanceof MapMessage) { - metrics.incrementBytesReceived((JMSUtils.getBodyLength((MapMessage) message))); - } else if (message instanceof TextMessage) { - metrics.incrementBytesReceived(((TextMessage) message).getText().getBytes().length); - } else { - handleException("Unsupported JMS message type : " + message.getClass().getName()); - } + metrics.incrementBytesReceived(JMSUtils.getMessageSize(message)); } catch (JMSException e) { log.warn("Error reading JMS message size to update transport metrics", e); } // has this message already expired? expiration time == 0 means never expires try { - long expiryTime = message.getJMSExpiration(); + long expiryTime = message.getJMSExpiration(); if (expiryTime > 0 && System.currentTimeMillis() > expiryTime) { if (log.isDebugEnabled()) { log.debug("Discard expired message with ID : " + message.getJMSMessageID()); } - return; + return true; } } catch (JMSException ignore) {} - workerPool.execute(new Worker(message)); - } - private void handleException(String msg, Exception e) { - log.error(msg, e); - throw new AxisJMSException(msg, e); - } + boolean successful = false; + try { + successful = processThoughEngine(message, ut); - private void handleException(String msg) { - log.error(msg); - throw new AxisJMSException(msg); - } + } catch (JMSException e) { + log.error("JMS Exception encountered while processing", e); + } catch (AxisFault e) { + log.error("Axis fault processing message", e); + } catch (Exception e) { + log.error("Unknown error processing message", e); + } finally { + if (successful) { + metrics.incrementMessagesReceived(); + } else { + metrics.incrementFaultsReceiving(); + } + } + + return successful; + } /** - * The actual Worker implementation which will process the - * received JMS messages in the worker thread pool + * Process the new message through Axis2 + * + * @param message the JMS message + * @param ut the UserTransaction used for receipt + * @return true if the caller should commit + * @throws JMSException, on JMS exceptions + * @throws AxisFault on Axis2 errors */ - class Worker implements Runnable { + private boolean processThoughEngine(Message message, UserTransaction ut) + throws JMSException, AxisFault { - private Message message = null; + MessageContext msgContext = jmsListener.createMessageContext(); - Worker(Message message) { - this.message = message; + // set the JMS Message ID as the Message ID of the MessageContext + try { + msgContext.setMessageID(message.getJMSMessageID()); + msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID()); + } catch (JMSException ignore) {} + + String soapAction = JMSUtils.getProperty(message, BaseConstants.SOAPACTION); + + AxisService service = endpoint.getService(); + msgContext.setAxisService(service); + + // find the operation for the message, or default to one + Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM); + QName operationQName = ( + operationParam != null ? + BaseUtils.getQNameFromString(operationParam.getValue()) : + BaseConstants.DEFAULT_OPERATION); + + AxisOperation operation = service.getOperation(operationQName); + if (operation != null) { + msgContext.setAxisOperation(operation); + msgContext.setSoapAction("urn:" + operation.getName().getLocalPart()); } - public void run() { - - MessageContext msgContext = jmsListener.createMessageContext(); - - // set the JMS Message ID as the Message ID of the MessageContext - try { - msgContext.setMessageID(message.getJMSMessageID()); - msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID()); - } catch (JMSException ignore) {} - - AxisService service = null; - try { - String soapAction = JMSUtils. - getProperty(message, BaseConstants.SOAPACTION); - - // set to bypass dispatching if we know the service - we already should! - if (serviceName != null) { - service = cfgCtx.getAxisConfiguration().getService(serviceName); - msgContext.setAxisService(service); - - // find the operation for the message, or default to one - Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM); - QName operationQName = ( - operationParam != null ? - BaseUtils.getQNameFromString(operationParam.getValue()) : - BaseConstants.DEFAULT_OPERATION); - - AxisOperation operation = service.getOperation(operationQName); - if (operation != null) { - msgContext.setAxisOperation(operation); - msgContext.setSoapAction("urn:" + operation.getName().getLocalPart()); - } - } + ContentTypeInfo contentTypeInfo = + endpoint.getContentTypeRuleSet().getContentTypeInfo(message); + if (contentTypeInfo == null) { + throw new AxisFault("Unable to determine content type for message " + + msgContext.getMessageID()); + } - // set the message property OUT_TRANSPORT_INFO - // the reply is assumed to be over the JMSReplyTo destination, using - // the same incoming connection factory, if a JMSReplyTo is available - if (message.getJMSReplyTo() != null) { - msgContext.setProperty( - Constants.OUT_TRANSPORT_INFO, - new JMSOutTransportInfo(jmsConnectionFactory, message.getJMSReplyTo())); - - } else if (service != null) { - // does the service specify a default reply destination ? - Parameter param = service.getParameter(JMSConstants.REPLY_PARAM); - if (param != null && param.getValue() != null) { - msgContext.setProperty( - Constants.OUT_TRANSPORT_INFO, - new JMSOutTransportInfo( - jmsConnectionFactory, - jmsConnectionFactory.getDestination((String) param.getValue()))); - } - } + // set the message property OUT_TRANSPORT_INFO + // the reply is assumed to be over the JMSReplyTo destination, using + // the same incoming connection factory, if a JMSReplyTo is available + Destination replyTo = message.getJMSReplyTo(); + if (replyTo == null) { + // does the service specify a default reply destination ? + Parameter param = service.getParameter(JMSConstants.PARAM_REPLY_DESTINATION); + if (param != null && param.getValue() != null) { + replyTo = jmsConnectionFactory.getDestination((String) param.getValue()); + } - String contentType = null; - if (service != null) { - contentType = (String)service.getParameterValue(JMSConstants.CONTENT_TYPE_PARAM); - } - if (contentType == null) { - contentType - = JMSUtils.getProperty(message, BaseConstants.CONTENT_TYPE); - } - - JMSUtils.setSOAPEnvelope(message, msgContext, contentType); + } + if (replyTo != null) { + msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, + new JMSOutTransportInfo(jmsConnectionFactory, replyTo, + contentTypeInfo.getPropertyName())); + } - jmsListener.handleIncomingMessage( - msgContext, - JMSUtils.getTransportHeaders(message), - soapAction, - contentType - ); - metrics.incrementMessagesReceived(); + JMSUtils.setSOAPEnvelope(message, msgContext, contentTypeInfo.getContentType()); + if (ut != null) { + msgContext.setProperty(BaseConstants.USER_TRANSACTION, ut); + } - } catch (Throwable e) { - metrics.incrementFaultsReceiving(); - jmsListener.error(service, e); - log.error("Exception while processing incoming message", e); + try { + jmsListener.handleIncomingMessage( + msgContext, + JMSUtils.getTransportHeaders(message), + soapAction, + contentTypeInfo.getContentType()); + + } finally { + + Object o = msgContext.getProperty(BaseConstants.SET_ROLLBACK_ONLY); + if (o != null) { + if ((o instanceof Boolean && ((Boolean) o)) || + (o instanceof String && Boolean.valueOf((String) o))) { + return false; + } } + return true; } } }
Modified: webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java?rev=730924&r1=730923&r2=730924&view=diff ============================================================================== --- webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java (original) +++ webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java Fri Jan 2 22:33:39 2009 @@ -16,6 +16,7 @@ package org.apache.axis2.transport.jms; import org.apache.axis2.transport.OutTransportInfo; +import org.apache.axis2.transport.base.BaseUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,8 +41,8 @@ /** The naming context */ private Context context; /** - * this is a reference to the underlying JMS connection factory when sending messages - * through connection factories not defined to the transport sender + * this is a reference to the underlying JMS ConnectionFactory when sending messages + * through connection factories not defined at the TransportSender level */ private ConnectionFactory connectionFactory = null; /** @@ -52,31 +53,39 @@ private JMSConnectionFactory jmsConnectionFactory = null; /** the Destination queue or topic for the outgoing message */ private Destination destination = null; - /** the Destination queue or topic for the outgoing message i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC */ - private String destinationType = JMSConstants.DESTINATION_TYPE_QUEUE; + /** the Destination queue or topic for the outgoing message + * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC + */ + private String destinationType = JMSConstants.DESTINATION_TYPE_GENERIC; /** the Reply Destination queue or topic for the outgoing message */ private Destination replyDestination = null; /** the Reply Destination name */ private String replyDestinationName = null; - /** the Reply Destination queue or topic for the outgoing message i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC */ - private String replyDestinationType = JMSConstants.DESTINATION_TYPE_QUEUE; + /** the Reply Destination queue or topic for the outgoing message + * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC + */ + private String replyDestinationType = JMSConstants.DESTINATION_TYPE_GENERIC; /** the EPR properties when the out-transport info is generated from a target EPR */ private Hashtable<String,String> properties = null; /** the target EPR string where applicable */ private String targetEPR = null; - private String contentType = null; - + /** the message property name that stores the content type of the outgoing message */ + private String contentTypeProperty; + /** * Creates an instance using the given JMS connection factory and destination * * @param jmsConnectionFactory the JMS connection factory * @param dest the destination + * @param contentTypeProperty */ - JMSOutTransportInfo(JMSConnectionFactory jmsConnectionFactory, Destination dest) { + JMSOutTransportInfo(JMSConnectionFactory jmsConnectionFactory, Destination dest, + String contentTypeProperty) { this.jmsConnectionFactory = jmsConnectionFactory; this.destination = dest; destinationType = dest instanceof Topic ? JMSConstants.DESTINATION_TYPE_TOPIC : JMSConstants.DESTINATION_TYPE_QUEUE; + this.contentTypeProperty = contentTypeProperty; } /** @@ -85,28 +94,31 @@ * @param targetEPR the target EPR */ JMSOutTransportInfo(String targetEPR) { + this.targetEPR = targetEPR; if (!targetEPR.startsWith(JMSConstants.JMS_PREFIX)) { handleException("Invalid prefix for a JMS EPR : " + targetEPR); + } else { - properties = JMSUtils.getProperties(targetEPR); - String destinationType = properties.get(JMSConstants.DEST_PARAM_TYPE); - if(destinationType != null) { + properties = BaseUtils.getEPRProperties(targetEPR); + String destinationType = properties.get(JMSConstants.PARAM_DEST_TYPE); + if (destinationType != null) { setDestinationType(destinationType); } - String replyDestinationType = properties.get(JMSConstants.REPLY_PARAM_TYPE); - if(replyDestinationType != null) { + + String replyDestinationType = properties.get(JMSConstants.PARAM_REPLY_DEST_TYPE); + if (replyDestinationType != null) { setReplyDestinationType(replyDestinationType); } - String replyDestinationName = properties.get(JMSConstants.REPLY_PARAM); - if(replyDestinationName != null) { - setReplyDestinationName(replyDestinationName); - } + + replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION); + contentTypeProperty = properties.get(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); try { context = new InitialContext(properties); } catch (NamingException e) { handleException("Could not get an initial context using " + properties, e); } + destination = getDestination(context, targetEPR); replyDestination = getReplyDestination(context, targetEPR); } @@ -132,7 +144,7 @@ private ConnectionFactory getConnectionFactory(Context context, Hashtable<String,String> props) { try { - String conFacJndiName = props.get(JMSConstants.CONFAC_JNDI_NAME_PARAM); + String conFacJndiName = props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME); if (conFacJndiName != null) { return JMSUtils.lookup(context, ConnectionFactory.class, conFacJndiName); } else { @@ -156,8 +168,12 @@ try { return JMSUtils.lookup(context, Destination.class, destinationName); } catch (NameNotFoundException e) { - if (log.isDebugEnabled()) { - log.debug("Cannot locate destination : " + destinationName + " using " + url); + try { + return JMSUtils.lookup(context, Destination.class, + (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ? + "dynamicTopics/" : "dynamicQueues/") + destinationName); + } catch (NamingException x) { + handleException("Cannot locate destination : " + destinationName + " using " + url); } } catch (NamingException e) { handleException("Cannot locate destination : " + destinationName + " using " + url, e); @@ -173,10 +189,11 @@ * @return the JMS destination, or null if it does not exist */ private Destination getReplyDestination(Context context, String url) { - String replyDestinationName = properties.get(JMSConstants.REPLY_PARAM); + String replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION); if(replyDestinationName == null) { return null; } + try { return JMSUtils.lookup(context, Destination.class, replyDestinationName); } catch (NameNotFoundException e) { @@ -186,13 +203,14 @@ } catch (NamingException e) { handleException("Cannot locate destination : " + replyDestinationName + " using " + url, e); } + return null; } /** * Look up for the given destination - * @param replyDest - * @return + * @param replyDest the JNDI name to lookup Destination required + * @return Destination for the JNDI name passed */ public Destination getReplyDestination(String replyDest) { try { @@ -232,7 +250,7 @@ } public void setContentType(String contentType) { - this.contentType = contentType; + // this is a useless Axis2 method imposed by the OutTransportInfo interface :( } public Hashtable<String,String> getProperties() { @@ -276,4 +294,12 @@ public void setReplyDestinationName(String replyDestinationName) { this.replyDestinationName = replyDestinationName; } + + public String getContentTypeProperty() { + return contentTypeProperty; + } + + public void setContentTypeProperty(String contentTypeProperty) { + this.contentTypeProperty = contentTypeProperty; + } } Modified: webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java?rev=730924&r1=730923&r2=730924&view=diff ============================================================================== --- webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java (original) +++ webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java Fri Jan 2 22:33:39 2009 @@ -23,6 +23,7 @@ import org.apache.axiom.om.OMNode; import org.apache.axiom.om.ds.MapDataSource; import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; import org.apache.axis2.context.MessageContext; import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.description.TransportOutDescription; @@ -30,20 +31,19 @@ import org.apache.axis2.transport.MessageFormatter; import org.apache.axis2.transport.OutTransportInfo; import org.apache.axis2.transport.base.*; +import org.apache.axis2.transport.base.streams.WriterOutputStream; import org.apache.axis2.transport.http.HTTPConstants; -import org.apache.commons.logging.LogFactory; import javax.jms.*; -import javax.jms.Queue; import javax.activation.DataHandler; -import javax.naming.Context; import javax.xml.stream.XMLStreamException; - import java.beans.XMLDecoder; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.UnsupportedEncodingException; +import java.io.OutputStream; +import java.io.StringWriter; +import java.nio.charset.UnsupportedCharsetException; import java.util.*; /** @@ -51,34 +51,23 @@ */ public class JMSSender extends AbstractTransportSender implements ManagementSupport { - public static final String TRANSPORT_NAME = "jms"; - - private JMSConnectionFactoryManager connFacManager; + public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS; - public JMSSender() { - log = LogFactory.getLog(JMSSender.class); - } + /** The JMS connection factory manager to be used when sending messages out */ + private JMSConnectionFactoryManager connFacManager; /** * Initialize the transport sender by reading pre-defined connection factories for - * outgoing messages. These will create sessions (one per each destination dealth with) - * to be used when messages are being sent. + * outgoing messages. + * * @param cfgCtx the configuration context * @param transportOut the transport sender definition from axis2.xml * @throws AxisFault on error */ public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault { super.init(cfgCtx, transportOut); - connFacManager = new JMSConnectionFactoryManager(cfgCtx); - // read the connection factory definitions and create them - connFacManager.loadConnectionFactoryDefinitions(transportOut); - connFacManager.start(); - } - - @Override - public void stop() { - connFacManager.stop(); - super.stop(); + connFacManager = new JMSConnectionFactoryManager(transportOut); + log.info("JMS Transport Sender initialized..."); } /** @@ -90,9 +79,9 @@ */ private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo trpInfo) { Map<String,String> props = trpInfo.getProperties(); - if(trpInfo.getProperties() != null) { - String jmsConnectionFactoryName = props.get(JMSConstants.CONFAC_PARAM); - if(jmsConnectionFactoryName != null) { + if (trpInfo.getProperties() != null) { + String jmsConnectionFactoryName = props.get(JMSConstants.PARAM_JMS_CONFAC); + if (jmsConnectionFactoryName != null) { return connFacManager.getJMSConnectionFactory(jmsConnectionFactoryName); } else { return connFacManager.getJMSConnectionFactory(props); @@ -109,84 +98,85 @@ OutTransportInfo outTransportInfo) throws AxisFault { JMSConnectionFactory jmsConnectionFactory = null; - Connection connection = null; // holds a one time connection if used JMSOutTransportInfo jmsOut = null; - Session session = null; - Destination replyDestination = null; + JMSMessageSender messageSender = null; - try { - if (targetAddress != null) { + if (targetAddress != null) { - jmsOut = new JMSOutTransportInfo(targetAddress); - // do we have a definition for a connection factory to use for this address? - jmsConnectionFactory = getJMSConnectionFactory(jmsOut); + jmsOut = new JMSOutTransportInfo(targetAddress); + // do we have a definition for a connection factory to use for this address? + jmsConnectionFactory = getJMSConnectionFactory(jmsOut); + + if (jmsConnectionFactory != null) { + messageSender = new JMSMessageSender(jmsConnectionFactory, targetAddress); - if (jmsConnectionFactory != null) { - // create new or get existing session to send to the destination from the CF - session = jmsConnectionFactory.getSessionForDestination( - JMSUtils.getDestination(targetAddress)); + } else { + try { + messageSender = JMSUtils.createJMSSender(jmsOut); + } catch (JMSException e) { + handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e); + } + } - } else { - // digest the targetAddress and locate CF from the EPR - jmsOut.loadConnectionFactoryFromProperies(); - try { - // create a one time connection and session to be used - Hashtable<String,String> jndiProps = jmsOut.getProperties(); - String user = jndiProps.get(Context.SECURITY_PRINCIPAL); - String pass = jndiProps.get(Context.SECURITY_CREDENTIALS); - - QueueConnectionFactory qConFac = null; - TopicConnectionFactory tConFac = null; - - if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(jmsOut.getDestinationType())) { - qConFac = (QueueConnectionFactory) jmsOut.getConnectionFactory(); - } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(jmsOut.getDestinationType())) { - tConFac = (TopicConnectionFactory) jmsOut.getConnectionFactory(); - } else { - handleException("Unable to determine type of JMS " + - "Connection Factory - i.e Queue/Topic"); - } + } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) { - if (user != null && pass != null) { - if (qConFac != null) { - connection = qConFac.createQueueConnection(user, pass); - } else if (tConFac != null) { - connection = tConFac.createTopicConnection(user, pass); - } - } else { - if (qConFac != null) { - connection = qConFac.createQueueConnection(); - } else if (tConFac != null) { - connection = tConFac.createTopicConnection(); - } - } + jmsOut = (JMSOutTransportInfo) outTransportInfo; + try { + messageSender = JMSUtils.createJMSSender(jmsOut); + } catch (JMSException e) { + handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e); + } + } - if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(jmsOut.getDestinationType())) { - session = ((QueueConnection)connection). - createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(jmsOut.getDestinationType())) { - session = ((TopicConnection)connection). - createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - } + // The message property to be used to send the content type is determined by + // the out transport info, i.e. either from the EPR if we are sending a request, + // or, if we are sending a response, from the configuration of the service that + // received the request). The property name can be overridden by a message + // context property. + String contentTypeProperty = + (String) msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); + if (contentTypeProperty == null) { + contentTypeProperty = jmsOut.getContentTypeProperty(); + } - } catch (JMSException e) { - handleException("Error creating a connection/session for : " + targetAddress, e); - } - } - replyDestination = jmsOut.getReplyDestination(); + // need to synchronize as Sessions are not thread safe + synchronized (messageSender.getSession()) { + try { + sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut); + } finally { + messageSender.close(); + } + } + } - } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) { + /** + * Perform actual sending of the JMS message + */ + private void sendOverJMS(MessageContext msgCtx, JMSMessageSender messageSender, + String contentTypeProperty, JMSConnectionFactory jmsConnectionFactory, + JMSOutTransportInfo jmsOut) throws AxisFault { + + // convert the axis message context into a JMS Message that we can send over JMS + Message message = null; + String correlationId = null; + try { + message = createJMSMessage(msgCtx, messageSender.getSession(), contentTypeProperty); + } catch (JMSException e) { + handleException("Error creating a JMS message from the message context", e); + } - jmsOut = (JMSOutTransportInfo) outTransportInfo; - jmsConnectionFactory = jmsOut.getJmsConnectionFactory(); + // should we wait for a synchronous response on this same thread? + boolean waitForResponse = waitForSynchronousResponse(msgCtx); + Destination replyDestination = jmsOut.getReplyDestination(); - session = jmsConnectionFactory.getSessionForDestination( - jmsOut.getDestination().toString()); - } - - Destination destination = jmsOut.getDestination(); + // if this is a synchronous out-in, prepare to listen on the response destination + if (waitForResponse) { String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO); + if (replyDestName == null && jmsConnectionFactory != null) { + replyDestName = jmsConnectionFactory.getReplyToDestination(); + } + if (replyDestName != null) { if (jmsConnectionFactory != null) { replyDestination = jmsConnectionFactory.getDestination(replyDestName); @@ -194,103 +184,45 @@ replyDestination = jmsOut.getReplyDestination(replyDestName); } } + replyDestination = JMSUtils.setReplyDestination( + replyDestination, messageSender.getSession(), message); + } - if(session == null) { - handleException("Could not create JMS session"); - } - - // now we are going to use the JMS session, but if this was a session from a - // defined JMS connection factory, we need to synchronize as sessions are not - // thread safe - synchronized(session) { - - // convert the axis message context into a JMS Message that we can send over JMS - Message message = null; - String correlationId = null; - try { - message = createJMSMessage(msgCtx, session); - } catch (JMSException e) { - handleException("Error creating a JMS message from the axis message context", e); - } - - String destinationType = jmsOut.getDestinationType(); - - // if the destination does not exist, see if we can create it - destination = JMSUtils.createDestinationIfRequired( - destination, destinationType, targetAddress, session); - - if(jmsOut.getReplyDestinationName() != null) { - replyDestination = JMSUtils.createReplyDestinationIfRequired( - replyDestination, jmsOut.getReplyDestinationName(), - jmsOut.getReplyDestinationType(), targetAddress, session); - } - - // should we wait for a synchronous response on this same thread? - boolean waitForResponse = waitForSynchronousResponse(msgCtx); - - // if this is a synchronous out-in, prepare to listen on the response destination - if (waitForResponse) { - replyDestination = JMSUtils.setReplyDestination( - replyDestination, session, message); - } + try { + messageSender.send(message, msgCtx); + metrics.incrementMessagesSent(msgCtx); - // send the outgoing message over JMS to the destination selected - try { - JMSUtils.sendMessageToJMSDestination(session, destination, destinationType, message); + } catch (AxisJMSException e) { + metrics.incrementFaultsSending(); + handleException("Error sending JMS message", e); + } - // set the actual MessageID to the message context for use by any others - try { - String msgId = message.getJMSMessageID(); - if (msgId != null) { - msgCtx.setProperty(JMSConstants.JMS_MESSAGE_ID, msgId); - } - } catch (JMSException ignore) {} + try { + metrics.incrementBytesSent(msgCtx, JMSUtils.getMessageSize(message)); + } catch (JMSException e) { + log.warn("Error reading JMS message size to update transport metrics", e); + } - metrics.incrementMessagesSent(); - try { - if (message instanceof BytesMessage) { - metrics.incrementBytesSent(JMSUtils.getBodyLength((BytesMessage) message)); - } else if (message instanceof MapMessage) { - metrics.incrementBytesSent((JMSUtils.getBodyLength((MapMessage) message))); - } else if (message instanceof TextMessage) { - metrics.incrementBytesSent(( - (TextMessage) message).getText().getBytes().length); - } else { - handleException("Unsupported JMS message type : " + - message.getClass().getName()); - } - } catch (JMSException e) { - log.warn("Error reading JMS message size to update transport metrics", e); - } - } catch (BaseTransportException e) { - metrics.incrementFaultsSending(); - throw e; - } + // if we are expecting a synchronous response back for the message sent out + if (waitForResponse) { + // TODO ******************************************************************************** + // TODO **** replace with asynchronous polling via a poller task to process this ******* + // information would be given. Then it should poll (until timeout) the + // requested destination for the response message and inject it from a + // asynchronous worker thread + try { + messageSender.getConnection().start(); // multiple calls are safely ignored + } catch (JMSException ignore) {} - // if we are expecting a synchronous response back for the message sent out - if (waitForResponse) { - if (connection != null) { - try { - connection.start(); - } catch (JMSException ignore) {} - } else { - // If connection is null, we are using a cached session and the underlying - // connection is already started. Thus, there is nothing to do here. - } - try { - correlationId = message.getJMSMessageID(); - } catch(JMSException ignore) {} - waitForResponseAndProcess(session, replyDestination, - jmsOut.getReplyDestinationType(), msgCtx, correlationId); - } - } + try { + correlationId = message.getJMSMessageID(); + } catch(JMSException ignore) {} - } finally { - if (connection != null) { - try { - connection.close(); - } catch (JMSException ignore) {} - } + // We assume here that the response uses the same message property to + // specify the content type of the message. + waitForResponseAndProcess(messageSender.getSession(), replyDestination, + msgCtx, correlationId, contentTypeProperty); + // TODO ******************************************************************************** } } @@ -301,28 +233,18 @@ * @param session the session to use to listen for the response * @param replyDestination the JMS reply Destination * @param msgCtx the outgoing message for which we are expecting the response + * @param contentTypeProperty the message property used to determine the content type + * of the response message * @throws AxisFault on error */ private void waitForResponseAndProcess(Session session, Destination replyDestination, - String replyDestinationType, MessageContext msgCtx, String correlationId) throws AxisFault { + MessageContext msgCtx, String correlationId, + String contentTypeProperty) throws AxisFault { try { - MessageConsumer consumer = null; - if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(replyDestinationType)) { - if (correlationId != null) { - consumer = ((QueueSession) session).createReceiver((Queue) replyDestination, - "JMSCorrelationID = '" + correlationId + "'"); - } else { - consumer = ((QueueSession) session).createReceiver((Queue) replyDestination); - } - } else { - if (correlationId != null) { - consumer = ((TopicSession) session).createSubscriber((Topic) replyDestination, - "JMSCorrelationID = '" + correlationId + "'", false); - } else { - consumer = ((TopicSession) session).createSubscriber((Topic) replyDestination); - } - } + MessageConsumer consumer; + consumer = JMSUtils.createConsumer(session, replyDestination, + "JMSCorrelationID = '" + correlationId + "'"); // how long are we willing to wait for the sync response long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT; @@ -344,23 +266,13 @@ // update transport level metrics metrics.incrementMessagesReceived(); try { - if (reply instanceof BytesMessage) { - metrics.incrementBytesReceived(JMSUtils.getBodyLength((BytesMessage) reply)); - } else if (reply instanceof MapMessage) { - metrics.incrementBytesReceived((JMSUtils.getBodyLength((MapMessage) reply))); - } else if (reply instanceof TextMessage) { - metrics.incrementBytesReceived(( - (TextMessage) reply).getText().getBytes().length); - } else { - handleException("Unsupported JMS message type : " + - reply.getClass().getName()); - } + metrics.incrementBytesReceived(JMSUtils.getMessageSize(reply)); } catch (JMSException e) { log.warn("Error reading JMS message size to update transport metrics", e); } try { - processSyncResponse(msgCtx, reply); + processSyncResponse(msgCtx, reply, contentTypeProperty); metrics.incrementMessagesReceived(); } catch (AxisFault e) { metrics.incrementFaultsReceiving(); @@ -376,8 +288,9 @@ } catch (JMSException e) { metrics.incrementFaultsReceiving(); - handleException("Error creating consumer or receiving reply to : " + - replyDestination, e); + handleException("Error creating a consumer, or receiving a synchronous reply " + + "for outgoing MessageContext ID : " + msgCtx.getMessageID() + + " and reply Destination : " + replyDestination, e); } } @@ -387,12 +300,14 @@ * * @param msgContext the MessageContext * @param session the JMS session + * @param contentTypeProperty the message property to be used to store the + * content type * @return a JMS message from the context and session * @throws JMSException on exception * @throws AxisFault on exception */ - private Message createJMSMessage(MessageContext msgContext, Session session) - throws JMSException, AxisFault { + private Message createJMSMessage(MessageContext msgContext, Session session, + String contentTypeProperty) throws JMSException, AxisFault { Message message = null; String msgType = getProperty(msgContext, JMSConstants.JMS_MESSAGE_TYPE); @@ -416,20 +331,7 @@ String contentType = messageFormatter.getContentType( msgContext, format, msgContext.getSoapAction()); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try { - messageFormatter.writeTo(msgContext, format, baos, true); - baos.flush(); - } catch (IOException e) { - handleException("IO Error while creating BytesMessage", e); - } - - if (msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) || - contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1) { - message = session.createBytesMessage(); - BytesMessage bytesMsg = (BytesMessage) message; - bytesMsg.writeBytes(baos.toByteArray()); - } else if (msgType != null && JMSConstants.JMS_MAP_MESSAGE.equals(msgType)) { + if (msgType != null && JMSConstants.JMS_MAP_MESSAGE.equals(msgType)) { OMElement wrapper = msgContext.getEnvelope().getBody().getFirstElement(); if (wrapper != null && wrapper instanceof OMSourcedElement) { OMSourcedElement omNode = (OMSourcedElement) wrapper; @@ -499,15 +401,43 @@ } } } else { - message = session.createTextMessage(); // default - TextMessage txtMsg = (TextMessage) message; + boolean useBytesMessage = + msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) || + contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1; + + OutputStream out; + StringWriter sw; + if (useBytesMessage) { + BytesMessage bytesMsg = session.createBytesMessage(); + sw = null; + out = new BytesMessageOutputStream(bytesMsg); + message = bytesMsg; + } else { + sw = new StringWriter(); + try { + out = new WriterOutputStream(sw, format.getCharSetEncoding()); + } catch (UnsupportedCharsetException ex) { + handleException("Unsupported encoding " + format.getCharSetEncoding(), ex); + return null; + } + } + try { - txtMsg.setText(new String(baos.toByteArray(), format.getCharSetEncoding())); - } catch (UnsupportedEncodingException ex) { - handleException("Unsupported encoding " + format.getCharSetEncoding(), ex); + messageFormatter.writeTo(msgContext, format, out, true); + out.close(); + } catch (IOException e) { + handleException("IO Error while creating BytesMessage", e); } + + if (!useBytesMessage) { + TextMessage txtMsg = session.createTextMessage(); + txtMsg.setText(sw.toString()); + message = txtMsg; + } + } + if (contentTypeProperty != null) { + message.setStringProperty(contentTypeProperty, contentType); } - message.setStringProperty(BaseConstants.CONTENT_TYPE, contentType); } else if (JMSConstants.JMS_BYTE_MESSAGE.equals(jmsPayloadType)) { message = session.createBytesMessage(); @@ -518,14 +448,12 @@ if (omNode != null && omNode instanceof OMText) { Object dh = ((OMText) omNode).getDataHandler(); if (dh != null && dh instanceof DataHandler) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); try { - ((DataHandler) dh).writeTo(baos); + ((DataHandler) dh).writeTo(new BytesMessageOutputStream(bytesMsg)); } catch (IOException e) { handleException("Error serializing binary content of element : " + BaseConstants.DEFAULT_BINARY_WRAPPER, e); } - bytesMsg.writeBytes(baos.toByteArray()); } } @@ -534,6 +462,7 @@ TextMessage txtMsg = (TextMessage) message; txtMsg.setText(msgContext.getEnvelope().getBody(). getFirstChildWithName(BaseConstants.DEFAULT_TEXT_WRAPPER).getText()); + } else if (JMSConstants.JMS_MAP_MESSAGE.equals(jmsPayloadType)) { OMElement wrapper = msgContext.getEnvelope().getBody(). getFirstChildWithName(BaseConstants.DEFAULT_MAP_WRAPPER); @@ -656,9 +585,12 @@ * * @param outMsgCtx the outgoing message for which we are expecting the response * @param message the JMS response message received + * @param contentTypeProperty the message property used to determine the content type + * of the response message * @throws AxisFault on error */ - private void processSyncResponse(MessageContext outMsgCtx, Message message) throws AxisFault { + private void processSyncResponse(MessageContext outMsgCtx, Message message, + String contentTypeProperty) throws AxisFault { MessageContext responseMsgCtx = createResponseMessageContext(outMsgCtx); @@ -671,7 +603,9 @@ // workaround as Axis2 1.2 is about to be released and Synapse 1.0 responseMsgCtx.setServerSide(false); - String contentType = JMSUtils.getProperty(message, BaseConstants.CONTENT_TYPE); + String contentType = + contentTypeProperty == null ? null + : JMSUtils.getProperty(message, contentTypeProperty); try { JMSUtils.setSOAPEnvelope(message, responseMsgCtx, contentType);
