http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSAdapter.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSAdapter.java b/core/src/flex/messaging/services/messaging/adapters/JMSAdapter.java deleted file mode 100644 index d04539f..0000000 --- a/core/src/flex/messaging/services/messaging/adapters/JMSAdapter.java +++ /dev/null @@ -1,1114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging.adapters; - -import java.util.Collection; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Hashtable; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import javax.jms.JMSException; -import javax.naming.Context; - -import flex.management.runtime.messaging.services.messaging.adapters.JMSAdapterControl; -import flex.messaging.Destination; -import flex.messaging.MessageClient; -import flex.messaging.MessageClientListener; -import flex.messaging.MessageDestination; -import flex.messaging.MessageException; -import flex.messaging.config.ConfigMap; -import flex.messaging.config.ConfigurationException; -import flex.messaging.log.Log; -import flex.messaging.log.LogCategories; -import flex.messaging.messages.CommandMessage; -import flex.messaging.messages.ErrorMessage; -import flex.messaging.messages.Message; -import flex.messaging.messages.MessagePerformanceInfo; -import flex.messaging.messages.MessagePerformanceUtils; -import flex.messaging.services.MessageService; -import flex.messaging.services.messaging.adapters.JMSSettings.DeliverySettings; - -/** - * This adapter for the MessageService integrates Flex messaging - * with Java Message Service destinations. - */ -public class JMSAdapter extends MessagingAdapter implements JMSConfigConstants, JMSExceptionListener, JMSMessageListener, MessageClientListener -{ - public static final String LOG_CATEGORY = LogCategories.SERVICE_MESSAGE_JMS; - private static final String DURABLE_SUBSCRIBER_NAME_PREFIX = "FlexClient_"; - - // Note that clientId is kept track as Object (instead of String) in all of - // these data structures because in clustering, clientId is not a String, - // it's an instance of org.jgroups.stack.IpAddress instead. - private Map<JMSConsumer, Object> consumerToClientId; - private Map<Object, MessageClient> messageClients; - private LinkedList<JMSProducer> topicProducers; - private Map<Object, JMSConsumer> topicConsumers; - private LinkedList<JMSProducer> queueProducers; - private Map<Object, JMSConsumer> queueConsumers; - - // JMSAdapter properties - private JMSSettings settings; - private JMSAdapterControl controller; - - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * Constructs an unmanaged <code>JMSAdapter</code> instance. - */ - public JMSAdapter() - { - this(false); - } - - /** - * Constructs a <code>JMSAdapter</code> instance. - * - * @param enableManagement <code>true</code> if the <code>JMSAdapter</code> - * has a corresponding MBean control for management; otherwise <code>false</code>. - */ - public JMSAdapter(boolean enableManagement) - { - super(enableManagement); - consumerToClientId = new ConcurrentHashMap<JMSConsumer, Object>(); - messageClients = new ConcurrentHashMap<Object, MessageClient>(); - topicProducers = new LinkedList<JMSProducer>(); - topicConsumers = new ConcurrentHashMap<Object, JMSConsumer>(); - queueProducers = new LinkedList<JMSProducer>(); - queueConsumers = new ConcurrentHashMap<Object, JMSConsumer>(); - settings = new JMSSettings(); - } - - //-------------------------------------------------------------------------- - // - // Initialize, validate, start, and stop methods. - // - //-------------------------------------------------------------------------- - - /** - * Initializes the <code>JMSAdapter</code> with the properties. - * - * @param id The id of the <code>JMSAdapter</code>. - * @param properties Properties for the <code>JMSAdapter</code>. - */ - @Override - public void initialize(String id, ConfigMap properties) - { - super.initialize(id, properties); - - if (properties == null || properties.size() == 0) - return; - - // JMS specific properties - jms(properties); - } - - /** - * Verifies that the <code>JMSAdapter</code> is in valid state before - * it is started. - */ - @Override - protected void validate() - { - if (isValid()) - return; - - super.validate(); - - if (settings.getConnectionFactory() == null) - { - // JMS connection factory of message destinations with JMS Adapters must be specified. - ConfigurationException ce = new ConfigurationException(); - ce.setMessage(MISSING_CONNECTION_FACTORY); - throw ce; - } - - if (settings.getDestinationJNDIName() == null) - { - // JNDI names for message destinations with JMS Adapters must be specified. - ConfigurationException ce = new ConfigurationException(); - ce.setMessage(JMSConfigConstants.MISSING_DESTINATION_JNDI_NAME); - throw ce; - } - - if (settings.getMessageType() == null) - { - // Unsupported JMS Message Type ''{0}''. Valid values are javax.jms.TextMessage, javax.jms.ObjectMessage, and javax.jms.MapMessage. - ConfigurationException ce = new ConfigurationException(); - ce.setMessage(INVALID_JMS_MESSAGE_TYPE, new Object[] {null}); - throw ce; - } - } - - /** - * Starts the adapter. - */ - @Override - public void start() - { - if (isStarted()) - return; - - super.start(); - - // Add JMS adapter as a MessageClient created listener so that its - // JMS consumers can be associated with their message clients. - MessageClient.addMessageClientCreatedListener(this); - } - - /** - * Stops the adapter. - */ - @Override - public void stop() - { - if (!isStarted()) - return; - - super.stop(); - - stopConsumers(topicConsumers.values()); - stopConsumers(queueConsumers.values()); - } - - //-------------------------------------------------------------------------- - // - // Public methods - // - //-------------------------------------------------------------------------- - - /** - * Casts the <code>Destination</code> into <code>MessageDestination</code> - * and calls super.setDestination. - * - * @param destination The destination of the adapter. - */ - @Override - public void setDestination(Destination destination) - { - MessageDestination dest = (MessageDestination)destination; - super.setDestination(dest); - } - - /** - * Gets the <code>JMSSettings</code> of the <code>JMSAdapter</code>. - * - * @return <code>JMSSettings</code> of the <code>JMSAdapter</code>. - */ - public JMSSettings getJMSSettings() - { - return settings; - } - - /** - * Sets the <code>JMSSettings</code> of the <code>JMSAdapter</code>. - * - * @param jmsSettings <code>JMSSettings</code> of the <code>JMSAdapter</code>. - */ - public void setJMSSettings(JMSSettings jmsSettings) - { - this.settings = jmsSettings; - } - - /** - * Returns the count of queue consumers managed by this adapter. - * - * @return The count of queue consumers managed by this adapter. - */ - public int getQueueConsumerCount() - { - return queueConsumers.size(); - } - - /** - * Returns the ids of all queue consumers. - * - * @return The ids of all queue consumers. - */ - public String[] getQueueConsumerIds() - { - Set<Object> consumerIds = queueConsumers.keySet(); - if (consumerIds != null) - { - String[] ids = new String[consumerIds.size()]; - return consumerIds.toArray(ids); - } - return new String[0]; - } - - /** - * Returns the count of topic consumers currently managed by this adapter. - * - * @return The count of topic consumers currently managed by this adapter. - */ - public int getTopicConsumerCount() - { - return topicConsumers.size(); - } - - /** - * Returns the ids of all topic consumers. - * - * @return The ids of all topic consumers. - */ - public String[] getTopicConsumerIds() - { - Set<Object> consumerIds = topicConsumers.keySet(); - if (consumerIds != null) - { - String[] ids = new String[consumerIds.size()]; - return consumerIds.toArray(ids); - } - return new String[0]; - } - - /** - * Returns the count of topic producers currently managed by this adapter. - * - * @return The count of topic producers currently managed by this adapter. - */ - public int getTopicProducerCount() - { - return topicProducers.size(); - } - - /** - * Returns the count of queue producers currently managed by this adapter. - * - * @return The count of queue producers currently managed by this adapter. - */ - public int getQueueProducerCount() - { - return queueProducers.size(); - } - - /** - * Implements JMSExceptionListener. - * When a JMSConsumer receives a JMS exception from its underlying JMS - * connection, it dispatches a JMS exception event to pass the exception - * to the JMS adapter. - * - * @param evt The <code>JMSExceptionEvent</code>. - */ - public void exceptionThrown(JMSExceptionEvent evt) - { - JMSConsumer consumer = (JMSConsumer)evt.getSource(); - JMSException jmsEx = evt.getJMSException(); - - // Client is unsubscribed because its corresponding JMS consumer for JMS destination ''{0}'' encountered an error during message delivery: {1} - MessageException messageEx = new MessageException(); - messageEx.setMessage(JMSConfigConstants.CLIENT_UNSUBSCRIBE_DUE_TO_MESSAGE_DELIVERY_ERROR, new Object[] {consumer.getDestinationJndiName(), jmsEx.getMessage()}); - removeConsumer(consumer, true, true, messageEx.createErrorMessage()); - } - - /** - * JMS adapter handles its subscriptions so this returns <code>true</code>. - * - * @return <code>true</code>. - */ - @Override - public boolean handlesSubscriptions() - { - return true; - } - - /** - * Publish a message to this adapter's JMS destination. - * - * @param message The Flex message to publish. - * @return The body of the acknowledge message which is null is this case. - */ - @SuppressWarnings("unchecked") - @Override - public Object invoke(Message message) - { - JMSProducer producer = null; - - // named Flex message props become JMS headers - Map msgProps = message.getHeaders(); - msgProps.put(JMSConfigConstants.TIME_TO_LIVE, new Long(message.getTimeToLive())); - - if (settings.getDestinationType().equals(TOPIC)) - { - synchronized (topicProducers) - { - if (topicProducers.size() < settings.getMaxProducers()) - { - producer = new JMSTopicProducer(); - try - { - producer.initialize(settings); - producer.start(); - } - catch (Exception e) - { - throw constructMessageException(e); - } - } - else - { - producer = topicProducers.removeFirst(); - } - - topicProducers.addLast(producer); - } - } - else if (settings.getDestinationType().equals(QUEUE)) - { - synchronized (queueProducers) - { - if (queueProducers.size() < settings.getMaxProducers()) - { - producer = new JMSQueueProducer(); - try - { - producer.initialize(settings); - producer.start(); - } - catch (Exception e) - { - throw constructMessageException(e); - } - } - else - { - producer = queueProducers.removeFirst(); - } - - queueProducers.addLast(producer); - } - } - - try - { - if (producer != null) - producer.sendMessage(message); - } - catch (JMSException jmsEx) - { - // At this point we give up on this producer, so we just - // stop and remove it from the pool. - if (settings.getDestinationType().equals(TOPIC)) - { - synchronized (topicProducers) - { - if (producer != null) - { - producer.stop(); - topicProducers.remove(producer); - } - } - } - else if (settings.getDestinationType().equals(QUEUE)) - { - synchronized (queueProducers) - { - if (producer != null) - { - producer.stop(); - queueProducers.remove(producer); - } - } - } - - throw constructMessageException(jmsEx); - } - - return null; - } - - /** - * Handle a CommandMessage sent by this adapter's service. - * - * @param commandMessage The command message to manage. - * @return The result of manage which is null in this case. - */ - @Override - public Object manage(CommandMessage commandMessage) - { - JMSConsumer consumer = null; - Object clientId = commandMessage.getClientId(); - - if (commandMessage.getOperation() == CommandMessage.SUBSCRIBE_OPERATION) - { - // Keep track of the selector expression. - Object selectorExpression = commandMessage.getHeaders().get(CommandMessage.SELECTOR_HEADER); - - // Create a JMSConsumer for this destination and associate it with the client id - if (settings.getDestinationType().equals(TOPIC)) - { - MessageClient existingMessageClient = null; - // This could happen when client disconnects without unsubscribing first. - if (topicConsumers.containsKey(clientId)) - { - removeConsumer(clientId, true /*unsubscribe*/, false /*invalidate*/, null); - existingMessageClient = messageClients.get(clientId); - } - // Create the consumer. - consumer = new JMSTopicConsumer(); - consumer.initialize(settings); - if (selectorExpression != null) - consumer.setSelectorExpression((String)selectorExpression); - // Need to build a subscription name, in case durable subscriptions are used. - ((JMSTopicConsumer)consumer).setDurableSubscriptionName(buildSubscriptionName(clientId)); - consumer.setMessageReceiver(buildMessageReceiver(consumer)); - - // Add JMSAdapter as JMS exception and message listener. - consumer.addJMSExceptionListener(this); - consumer.addJMSMessageListener(this); - topicConsumers.put(clientId, consumer); - consumerToClientId.put(consumer, clientId); - - // Means client was disconnected without unsubscribing, hence no - // new message client will be created. Make sure the old one is - // wired up with the new JMS consumer properly. - if (existingMessageClient != null) - messageClientCreated(existingMessageClient); - } - else if (settings.getDestinationType().equals(QUEUE)) - { - MessageClient existingMessageClient = null; - if (queueConsumers.containsKey(clientId)) - { - removeConsumer(clientId, true /*unsubscribe*/, false /*invalidate*/, null); - existingMessageClient = messageClients.get(clientId); - } - // Create the consumer. - consumer = new JMSQueueConsumer(); - consumer.initialize(settings); - if (selectorExpression != null) - consumer.setSelectorExpression((String)selectorExpression); - consumer.setMessageReceiver(buildMessageReceiver(consumer)); - - // Add JMSAdapter as JMS exception and message listener. - consumer.addJMSExceptionListener(this); - consumer.addJMSMessageListener(this); - queueConsumers.put(clientId, consumer); - consumerToClientId.put(consumer, clientId); - - // Means client was disconnected without unsubscribing, hence no - // new message client will be created. Make sure the old one is - // wired up with the new JMS consumer properly. - if (existingMessageClient != null) - messageClientCreated(existingMessageClient); - } - } - - else if (commandMessage.getOperation() == CommandMessage.UNSUBSCRIBE_OPERATION) - { - // Determines if the durable subscription should be unsubscribed - // when the JMS consumer is removed. - boolean unsubscribe = true; - - boolean preserveDurable = false; - if (commandMessage.getHeader(CommandMessage.PRESERVE_DURABLE_HEADER) != null) - preserveDurable = ((Boolean)(commandMessage.getHeader(CommandMessage.PRESERVE_DURABLE_HEADER))).booleanValue(); - - // Don't destroy a durable subscription if the MessageClient's session has been invalidated. - // or this is a JMS durable connection that has requested to be undestroyed - if (commandMessage.getHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER) != null - && ((Boolean)commandMessage.getHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER)).booleanValue() - || preserveDurable) - unsubscribe = false; - - removeConsumer(clientId, unsubscribe, false, null); - } - - // CommandMessage.POLL_OPERATION handling is left to the Endpoint - // hence not handled by this adapter. - - return null; - } - - /** - * Implements MessageClientListener. - * When a MessageClient is created, JMSAdapter looks up its JMSConsumers - * and if there is a JMSConsumer that has the same clientId as MessageClient, - * it adds the MessageClient to its list clients. This helps in keeping both - * sides of the bridge (MessageClient and JMSConsumer) notified when there's - * a failure on either side of the bridge. - * - * @param messageClient The newly created MessageClient. - */ - public void messageClientCreated(MessageClient messageClient) - { - Object clientId = messageClient.getClientId(); - JMSConsumer consumer = null; - if (topicConsumers.containsKey(clientId)) - consumer = topicConsumers.get(clientId); - else if (queueConsumers.containsKey(clientId)) - consumer = queueConsumers.get(clientId); - - // If there is a JMSConsumer created for the same clientId, register - // the MessageClient with JMSAdapter and start the consumer. - if (consumer != null) - { - messageClients.put(clientId, messageClient); - try - { - consumer.start(); - // Add JMS adapter as a client destroyed listener, so client - // invalidation (eg. due to session timeout) can be handled properly. - messageClient.addMessageClientDestroyedListener(this); - } - catch (MessageException messageEx) - { - removeConsumer(consumer, true, true, messageEx.createErrorMessage()); - } - catch (Exception ex) - { - removeConsumer(consumer, true, true, constructMessageException(ex).createErrorMessage()); - } - } - } - - /** - * Implements MessageClientListener. - * When a MessageClient is destroyed (usually due to session timeout), its - * corresponding JMS consumer is removed. Note that this might have already - * happened if the client first unsubscribes and in that case, this is a no-op. - * - * @param messageClient The MessageClient that was destroyed. - */ - public void messageClientDestroyed(MessageClient messageClient) - { - Object clientId = messageClient.getClientId(); - removeConsumer(clientId); - messageClients.remove(clientId); - } - - /** - * Implements JMSMessageListener. - * When a JMSConsumer receives a JMS message, it dispatched a JMS message - * event to pass the message to the JMS adapter. - * - * @param evt The <code>JMSMessageEvent</code>. - */ - public void messageReceived(JMSMessageEvent evt) - { - JMSConsumer consumer = (JMSConsumer)evt.getSource(); - javax.jms.Message jmsMessage = evt.getJMSMessage(); - - flex.messaging.messages.AsyncMessage flexMessage = convertToFlexMessage(jmsMessage, consumer); - if (flexMessage != null) - { - MessagePerformanceUtils.markServerPostAdapterExternalTime(flexMessage); - ((MessageService)getDestination().getService()).serviceMessageFromAdapter(flexMessage, false); - } - } - - /** - * Removes (unsubscribes) the specified consumer. By default, it removes - * the durable subscription and pushes a generic error message to the client - * before MessageClient invalidation. - * - * @param clientId The identifier for the consumer to remove. - */ - public void removeConsumer(Object clientId) - { - // Client is unsubscribed because its corresponding JMS consumer has been removed from the JMS adapter. - MessageException messageEx = new MessageException(); - messageEx.setMessage(JMSConfigConstants.CLIENT_UNSUBSCRIBE_DUE_TO_CONSUMER_REMOVAL); - removeConsumer(clientId, true, true, messageEx.createErrorMessage()); - } - - //-------------------------------------------------------------------------- - // - // Protected and Private methods - // - //-------------------------------------------------------------------------- - - /** - * Removes (unsubscribes) the JMSConsumer associated with the clientId. - * - * @param clientId The clientId associated with the JMSConsumer to remove. - * @param unsubscribe Whether to unsubscribe the durable subscription or not. - * @param invalidate Whether to invalidate the MessageClient or not. - * @param invalidateMessage A message to push to the client before consumer - * is removed and its MessageClient is invalidated. If the message is null, - * MessageClient is invalidated silently. - */ - protected void removeConsumer(Object clientId, boolean unsubscribe, boolean invalidate, ErrorMessage invalidateMessage) - { - JMSConsumer consumer = null; - if (topicConsumers.containsKey(clientId)) - consumer = topicConsumers.get(clientId); - else if (queueConsumers.containsKey(clientId)) - consumer = queueConsumers.get(clientId); - - removeConsumer(consumer, unsubscribe, invalidate, invalidateMessage); - } - - /** - * Removes (unsubscribes) the specified consumer. - * - * @param consumer The JMSConsumer instance to remove. - * @param unsubscribe Whether to unsubscribe the durable subscription or not. - * @param invalidate Whether to invalidate the MessageClient or not. - * @param invalidateMessage A message to push to the client before consumer - * is removed and its MessageClient is invalidated. If the message is null, - * MessageClient is invalidated silently. - */ - protected void removeConsumer(JMSConsumer consumer, boolean unsubscribe, boolean invalidate, ErrorMessage invalidateMessage) - { - if (consumer == null) - return; - - Object clientId = consumerToClientId.get(consumer); - if (clientId == null) - return; - - if (Log.isInfo()) - { - String logMessage = "JMS consumer for JMS destination '" + consumer.getDestinationJndiName() - + "' is being removed from the JMS adapter"; - - if (invalidateMessage != null) - logMessage += " due to the following error: " + invalidateMessage.faultString; - - Log.getLogger(JMSAdapter.LOG_CATEGORY).info(logMessage); - } - - consumer.removeJMSExceptionListener(this); - consumer.removeJMSMessageListener(this); - consumer.stop(unsubscribe); - if (invalidate) - invalidateMessageClient(consumer, invalidateMessage); - if (consumer instanceof JMSTopicConsumer) - topicConsumers.remove(clientId); - else // assuming JMSQueueConsumer. - queueConsumers.remove(clientId); - consumerToClientId.remove(consumer); - // Message client will be removed in messageClientDestroyed. - } - - /** - * Invoked automatically to allow the <code>JMSAdapter</code> to setup its corresponding - * MBean control. - * - * @param destination The <code>Destination</code> that manages this <code>JMSAdapter</code>. - */ - @Override - protected void setupAdapterControl(Destination destination) - { - controller = new JMSAdapterControl(this, destination.getControl()); - controller.register(); - setControl(controller); - } - - /** - * Builds a MessageReceiver for JMSConsumer from DeliverySettings. - * - * @param consumer The <code>JMSConsumer</code>. - * @return MessageReceiver configured for JMSConsumer per DeliverySettings. - */ - private MessageReceiver buildMessageReceiver(JMSConsumer consumer) - { - DeliverySettings deliverySettings = settings.getDeliverySettings(); - if (deliverySettings.getMode().equals(JMSConfigConstants.ASYNC)) - return new AsyncMessageReceiver(consumer); - SyncMessageReceiver syncMessageReceiver = new SyncMessageReceiver(consumer); - syncMessageReceiver.setSyncReceiveIntervalMillis(deliverySettings.getSyncReceiveIntervalMillis()); - syncMessageReceiver.setSyncReceiveWaitMillis(deliverySettings.getSyncReceiveWaitMillis()); - return syncMessageReceiver; - } - - /** - * Prefixes a clientId with DURABLE_SUBSCRIBER_NAME_PREFIX to build a - * subscription name to be used in JMSConsumers with durable connections. - */ - private String buildSubscriptionName(Object clientId) - { - return DURABLE_SUBSCRIBER_NAME_PREFIX + clientId.toString(); - } - - /** - * Construct a MessageException for the JMS invocation - * @param e the Exception caught in the JMS invocation - * @return MessageException encapsulates the JMS Exception message - */ - private MessageException constructMessageException(Exception e) - { - MessageException messageEx = new MessageException(); - messageEx.setMessage(JMSINVOCATION_EXCEPTION, new Object[] { e.getMessage() }); - return messageEx; - } - - /** - * Convert from a <code>javax.jms.Message</code> type to the <code>flex.messaging.messages.AsyncMessage</code> type. - * Supported types are <code>javax.jms.TextMessage</code>, <code>javax.jms.ObjectMessage</code>, - * and <code>javax.jms.MapMessage</code>. - */ - private flex.messaging.messages.AsyncMessage convertToFlexMessage(javax.jms.Message jmsMessage, JMSConsumer consumer) - { - flex.messaging.messages.AsyncMessage flexMessage = null; - flexMessage = new flex.messaging.messages.AsyncMessage(); - - Object clientId = consumerToClientId.get(consumer); - if (clientId == null) - { - if (Log.isWarn()) - Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered a null clientId during JMS to Flex message conversion"); - - return null; - } - flexMessage.setClientId(clientId); - - - flexMessage.setDestination(getDestination().getId()); - - // Set JMSMessageID header as Flex messageId property. - try - { - flexMessage.setMessageId(jmsMessage.getJMSMessageID()); - } - catch (JMSException jmsEx) - { - if (Log.isWarn()) - Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS message id during JMS to Flex message conversion: " + jmsEx.getMessage()); - } - - // Set JMSTimestamp header as Flex timestamp property. - try - { - flexMessage.setTimestamp(jmsMessage.getJMSTimestamp()); - } - catch (JMSException jmsEx) - { - if (Log.isWarn()) - Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS timestamp during JMS to Flex message conversion: " + jmsEx.getMessage()); - } - - // Set JMS headers and Flex headers. - if (settings.isPreserveJMSHeaders()) - { - // Set standard JMS headers except JMSMessageId and JMSTimestamp, - // as they are already set on the Flex message directly. - try - { - flexMessage.setHeader(JMS_CORRELATION_ID, jmsMessage.getJMSCorrelationID()); - flexMessage.setHeader(JMS_DELIVERY_MODE, Integer.toString(jmsMessage.getJMSDeliveryMode())); - flexMessage.setHeader(JMS_DESTINATION, jmsMessage.getJMSDestination().toString()); - flexMessage.setHeader(JMS_EXPIRATION, Long.toString(jmsMessage.getJMSExpiration())); - flexMessage.setHeader(JMS_PRIORITY, Integer.toString(jmsMessage.getJMSPriority())); - flexMessage.setHeader(JMS_REDELIVERED, Boolean.toString(jmsMessage.getJMSRedelivered())); - flexMessage.setHeader(JMS_REPLY_TO, jmsMessage.getJMSReplyTo()); - flexMessage.setHeader(JMS_TYPE, jmsMessage.getJMSType()); - } - catch (JMSException jmsEx) - { - // These should not cause errors to be pushed to Flash clients - if (Log.isWarn()) - Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS headers during JMS to Flex conversion: " + jmsEx.getMessage()); - - } - } - - // Set JMS properties as Flex headers. - - // While iterating through JMS message properties, build a message - // performance object to send back to the client with the message - // properties starting with with MPI_HEADER_IN (if any). - MessagePerformanceInfo mpi = null; - - try - { - for (Enumeration propEnum = jmsMessage.getPropertyNames(); propEnum.hasMoreElements();) - { - String propName = (String)propEnum.nextElement(); - try - { - Object propValue = jmsMessage.getObjectProperty(propName); - if (propName.startsWith(MessagePerformanceUtils.MPI_HEADER_IN)) - { - if (mpi == null) - mpi = new MessagePerformanceInfo(); - propName = propName.substring(MessagePerformanceUtils.MPI_HEADER_IN.length()); - java.lang.reflect.Field field; - try - { - field = mpi.getClass().getField(propName); - field.set(mpi, propValue); - } - catch (Exception ignore) - { - // Simply don't set the property if the value cannot be retrieved. - } - } - else - { - flexMessage.setHeader(propName, propValue); - } - } - catch (JMSException jmsEx) - { - if (Log.isWarn()) - Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS properties during JMS to Flex conversion: " + jmsEx.getMessage()); - } - } - - if (mpi != null) - flexMessage.setHeader(MessagePerformanceUtils.MPI_HEADER_IN, mpi); - } - catch (JMSException jmsEx) - { - if (Log.isWarn()) - Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS properties during JMS to Flex conversion: " + jmsEx.getMessage()); - } - - // Finally, set the JMS message body of the Flex message body. - try - { - if (jmsMessage instanceof javax.jms.TextMessage) - { - javax.jms.TextMessage textMessage = (javax.jms.TextMessage)jmsMessage; - flexMessage.setBody(textMessage.getText()); - } - else if (jmsMessage instanceof javax.jms.ObjectMessage) - { - javax.jms.ObjectMessage objMessage = (javax.jms.ObjectMessage)jmsMessage; - flexMessage.setBody(objMessage.getObject()); - } - else if (jmsMessage instanceof javax.jms.MapMessage) - { - javax.jms.MapMessage mapMessage = (javax.jms.MapMessage)jmsMessage; - @SuppressWarnings("unchecked") - Enumeration names = mapMessage.getMapNames(); - Map<String, Object> body = new HashMap<String, Object>(); - while (names.hasMoreElements()) - { - String name = (String)names.nextElement(); - body.put(name, mapMessage.getObject(name)); - } - flexMessage.setBody(body); - } - } - catch (JMSException jmsEx) - { - if (Log.isWarn()) - Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS message body during JMS to Flex conversion: " + jmsEx.getMessage()); - } - - return flexMessage; - } - - /** - * Invalidates the MessageClient associated with the JMSConsumer with the - * supplied error message. - * - * @param consumer The JMSConsumer whose MessageClient will be invalidated. - * @param message The error message to push out before invalidating the - * MessageClient. If the message is null, MessageClient is invalidated - * silently. - */ - private void invalidateMessageClient(JMSConsumer consumer, flex.messaging.messages.Message message) - { - Object clientId = consumerToClientId.get(consumer); - if (clientId != null && messageClients.containsKey(clientId)) - { - MessageClient messageClient = messageClients.get(clientId); - - if (Log.isInfo()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).info("The corresponding MessageClient for JMS consumer for JMS destination '" - + consumer.getDestinationJndiName() + "' is being invalidated"); - - messageClient.invalidate(message); - } - } - - /** - * Handle JMS specific configuration. - */ - private void jms(ConfigMap properties) - { - ConfigMap jms = properties.getPropertyAsMap(JMS, null); - if (jms != null) - { - String destType = jms.getPropertyAsString(DESTINATION_TYPE, defaultDestinationType); - settings.setDestinationType(destType); - - String msgType = jms.getPropertyAsString(MESSAGE_TYPE, null); - settings.setMessageType(msgType); - - String factory = jms.getPropertyAsString(CONNECTION_FACTORY, null); - settings.setConnectionFactory(factory); - - ConfigMap connectionCredentials = jms.getPropertyAsMap(CONNECTION_CREDENTIALS, null); - if (connectionCredentials != null) - { - String username = connectionCredentials.getPropertyAsString(USERNAME, null); - settings.setConnectionUsername(username); - String password = connectionCredentials.getPropertyAsString(PASSWORD, null); - settings.setConnectionPassword(password); - } - - ConfigMap deliverySettings = jms.getPropertyAsMap(DELIVERY_SETTINGS, null); - if (deliverySettings != null) - { - // Get the default delivery settings. - DeliverySettings ds = settings.getDeliverySettings(); - - String mode = deliverySettings.getPropertyAsString(MODE, JMSConfigConstants.defaultMode); - ds.setMode(mode); - - long receiveIntervalMillis = deliverySettings.getPropertyAsLong(SYNC_RECEIVE_INTERVAL_MILLIS, defaultSyncReceiveIntervalMillis); - ds.setSyncReceiveIntervalMillis(receiveIntervalMillis); - - long receiveWaitMillis = deliverySettings.getPropertyAsLong(SYNC_RECEIVE_WAIT_MILLIS, defaultSyncReceiveWaitMillis); - ds.setSyncReceiveWaitMillis(receiveWaitMillis); - } - - String destJNDI = jms.getPropertyAsString(DESTINATION_JNDI_NAME, null); - settings.setDestinationJNDIName(destJNDI); - - String dest = jms.getPropertyAsString(DESTINATION_NAME, null); - if (dest != null && Log.isWarn()) - Log.getLogger(LOG_CATEGORY).warn("The <destination-name> configuration option is deprecated and non-functional. Please remove this from your configuration file."); - - boolean durable = getDestination() instanceof MessageDestination ? - ((MessageDestination) getDestination()).getServerSettings().isDurable() : false; - settings.setDurableConsumers(durable); - - String deliveryMode = jms.getPropertyAsString(DELIVERY_MODE, null); - settings.setDeliveryMode(deliveryMode); - - boolean preserveJMSHeaders = jms.getPropertyAsBoolean(PRESERVE_JMS_HEADERS, settings.isPreserveJMSHeaders()); - settings.setPreserveJMSHeaders(preserveJMSHeaders); - - String defPriority = jms.getPropertyAsString(MESSAGE_PRIORITY, null); - if (defPriority != null && !defPriority.equalsIgnoreCase(DEFAULT_PRIORITY)) - { - int priority = jms.getPropertyAsInt(MESSAGE_PRIORITY, settings.getMessagePriority()); - settings.setMessagePriority(priority); - } - - String ackMode = jms.getPropertyAsString(ACKNOWLEDGE_MODE, defaultAcknowledgeMode); - settings.setAcknowledgeMode(ackMode); - - boolean transMode = jms.getPropertyAsBoolean(TRANSACTION_MODE, false); - if (transMode && Log.isWarn()) - Log.getLogger(LOG_CATEGORY).warn("The <transacted-sessions> configuration option is deprecated and non-functional. Please remove this from your configuration file."); - - int maxProducers = jms.getPropertyAsInt(MAX_PRODUCERS, defaultMaxProducers); - settings.setMaxProducers(maxProducers); - - // Retrieve any JNDI initial context environment properties. - ConfigMap env = jms.getPropertyAsMap(INITIAL_CONTEXT_ENVIRONMENT, null); - if (env != null) - { - List props = env.getPropertyAsList(PROPERTY, null); - if (props != null) - { - Class contextClass = Context.class; - Hashtable envProps = new Hashtable(); - for (Iterator iter = props.iterator(); iter.hasNext();) - { - Object prop = iter.next(); - if (prop instanceof ConfigMap) - { - ConfigMap pair = (ConfigMap)prop; - String name = pair.getProperty(NAME); - String value = pair.getProperty(VALUE); - if (name == null || value == null) - { - // A <property> element for the <initial-context-environment> settings for the ''{0}'' destination does not specify both <name> and <value> subelements. - MessageException messageEx = new MessageException(); - messageEx.setMessage(MISSING_NAME_OR_VALUE, new Object[] {getDestination().getId()}); - throw messageEx; - } - // If the name is a Context field, use the - // constant value rather than this literal name. - if (name.startsWith("Context.")) - { - String fieldName = name.substring(name.indexOf('.') + 1); - java.lang.reflect.Field field = null; - try - { - field = contextClass.getDeclaredField(fieldName); - } - catch (NoSuchFieldException nsfe) - { - // A <property> element for the <initial-context-environment> settings for the ''{0}'' destination specifies an invalid javax.naming.Context field for its <name>: {1} - MessageException messageEx = new MessageException(); - messageEx.setMessage(INVALID_CONTEXT_NAME, new Object[] {getDestination().getId(), fieldName}); - throw messageEx; - } - String fieldValue = null; - try - { - fieldValue = (String)field.get(null); - } - catch (IllegalAccessException iae) - { - // A <property> element for the <initial-context-environment> settings for the ''{0}'' destination specifies an inaccessible javax.naming.Context field for its <name>: {1} - MessageException messageEx = new MessageException(); - messageEx.setMessage(INACCESIBLE_CONTEXT_NAME, new Object[] {getDestination().getId(), fieldName}); - throw messageEx; - } - envProps.put(fieldValue, value); - } - else - { - envProps.put(name, value); - } - } - else - { - // A <property> element for the <initial-context-environment> settings for the ''{0}'' destination does not specify both <name> and <value> subelements. - MessageException messageEx = new MessageException(); - messageEx.setMessage(MISSING_NAME_OR_VALUE, new Object[] {getDestination().getId()}); - throw messageEx; - } - } - settings.setInitialContextEnvironment(envProps); - } - else - { - // The <initial-context-environment> settings for the ''{0}'' destination does not include any <property> subelements. - MessageException messageEx = new MessageException(); - messageEx.setMessage(MISSING_PROPERTY_SUBELEMENT, new Object[] {getDestination().getId()}); - throw messageEx; - } - } - } - } - - private void stopConsumers(Collection<JMSConsumer> consumers) - { - Iterator<JMSConsumer> itr = consumers.iterator(); - while (itr.hasNext()) - { - JMSConsumer consumer = itr.next(); - // Client is unsubscribed because its corresponding JMS consumer for JMS destination ''{0}'' has been stopped. - MessageException me = new MessageException(); - me.setMessage(JMSConfigConstants.CLIENT_UNSUBSCRIBE_DUE_TO_CONSUMER_STOP, new Object[] {consumer.getDestinationJndiName()}); - consumer.stop(true); - invalidateMessageClient(consumer, me.createErrorMessage()); - } - } -} -
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSConfigConstants.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSConfigConstants.java b/core/src/flex/messaging/services/messaging/adapters/JMSConfigConstants.java deleted file mode 100644 index e792eb6..0000000 --- a/core/src/flex/messaging/services/messaging/adapters/JMSConfigConstants.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging.adapters; - -/** - * Constants for JMSAdapter and its related classes. - * - * - */ -public interface JMSConfigConstants -{ - // Values used in the messaging configuration - String ASYNC = "async"; - String ACKNOWLEDGE_MODE = "acknowledge-mode"; - String AUTO_ACKNOWLEDGE = "auto_acknowledge"; - String CLIENT_ACKNOWLEDGE = "client_acknowledge"; - String CONNECTION_FACTORY = "connection-factory"; - String CONNECTION_CREDENTIALS = "connection-credentials"; - String DELIVERY_SETTINGS = "delivery-settings"; - String DEFAULT_DELIVERY_MODE = "default_delivery_mode"; - String DEFAULT_PRIORITY = "default-priority"; - String DELIVERY_MODE = "delivery-mode"; - String DESTINATION_JNDI_NAME = "destination-jndi-name"; - String DESTINATION_NAME = "destination-name"; - String DESTINATION_TYPE = "destination-type"; - String DUPS_OK_ACKNOWLEDGE = "dups_ok_acknowledge"; - String INITIAL_CONTEXT_ENVIRONMENT = "initial-context-environment"; - String JMS = "jms"; - String MAX_PRODUCERS = "max-producers"; - String MAP_MESSAGE = "javax.jms.MapMessage"; - String MESSAGE_TYPE = "message-type"; - String MESSAGE_PRIORITY = "message-priority"; - String MODE = "mode"; - String NAME = "name"; - String NON_PERSISTENT = "non_persistent"; - String OBJECT_MESSAGE = "javax.jms.ObjectMessage"; - String PASSWORD = "password"; - String PERSISTENT = "persistent"; - String PRESERVE_JMS_HEADERS = "preserve-jms-headers"; - String PROPERTY = "property"; - String SYNC = "sync"; - String SYNC_RECEIVE_INTERVAL_MILLIS = "sync-receive-interval-millis"; - String SYNC_RECEIVE_WAIT_MILLIS = "sync-receive-wait-millis"; - String QUEUE = "queue"; - String TEXT_MESSAGE = "javax.jms.TextMessage"; - String TOPIC = "topic"; - String TRANSACTION_MODE = "transacted-sessions"; - String USERNAME = "username"; - String VALUE = "value"; - - // Flex message properties to translate to JMS - String TIME_TO_LIVE = "timeToLive"; - - // Standard JMS headers to translate to Flex - String JMS_CORRELATION_ID = "JMSCorrelationID"; - String JMS_DELIVERY_MODE = "JMSDeliveryMode"; - String JMS_DESTINATION = "JMSDestination"; - String JMS_EXPIRATION = "JMSExpiration"; - // public static final String JMS_MESSAGE_ID = "JMSMessageID"; - String JMS_PRIORITY = "JMSPriority"; - String JMS_REDELIVERED = "JMSRedelivered"; - String JMS_REPLY_TO = "JMSReplyTo"; - // public static final String JMS_TIMESTAMP = "JMSTimestamp"; - String JMS_TYPE = "JMSType"; - - // Defaults - String defaultAcknowledgeMode = AUTO_ACKNOWLEDGE; - String defaultDestinationType = TOPIC; - boolean defaultPreserveJMSHeaders = true; - long defaultSyncReceiveIntervalMillis = 100; - long defaultSyncReceiveWaitMillis = 0; - int defaultMaxProducers = 1; - String defaultMode = SYNC; - - // Errors - int MISSING_NAME_OR_VALUE = 10800; - int INVALID_CONTEXT_NAME = 10801; - int INACCESIBLE_CONTEXT_NAME = 10802; - int MISSING_PROPERTY_SUBELEMENT = 10803; - int MISSING_CONNECTION_FACTORY = 10804; - int INVALID_DESTINATION_TYPE = 10805; - // int CLIENT_NOT_SUBSCRIBED = 10806; - int MISSING_DESTINATION_JNDI_NAME = 10807; - int INVALID_ACKNOWLEDGE_MODE = 10808; - int INVALID_DELIVERY_MODE = 10809; - int NONSERIALIZABLE_MESSAGE_BODY = 10810; - int INVALID_JMS_MESSAGE_TYPE = 10811; - int NONMAP_MESSAGE_BODY = 10812; - int NON_QUEUE_DESTINATION = 10813; - int NON_QUEUE_FACTORY = 10814; - int NON_TOPIC_DESTINATION = 10815; - int NON_TOPIC_FACTORY = 10816; - int INVALID_DELIVERY_MODE_VALUE = 10817; - int ASYNC_MESSAGE_DELIVERY_NOT_SUPPORTED = 10818; - int DURABLE_SUBSCRIBER_NOT_SUPPORTED = 10819; - int CLIENT_UNSUBSCRIBE_DUE_TO_MESSAGE_DELIVERY_ERROR = 10820; - int CLIENT_UNSUBSCRIBE_DUE_TO_CONSUMER_REMOVAL = 10821; - int CLIENT_UNSUBSCRIBE_DUE_TO_CONSUMER_STOP = 10822; - int MISSING_DURABLE_SUBSCRIPTION_NAME = 10823; - int JMSINVOCATION_EXCEPTION = 10824; -} - http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSConsumer.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSConsumer.java b/core/src/flex/messaging/services/messaging/adapters/JMSConsumer.java deleted file mode 100644 index fa375f0..0000000 --- a/core/src/flex/messaging/services/messaging/adapters/JMSConsumer.java +++ /dev/null @@ -1,464 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging.adapters; - -import java.util.Iterator; -import java.util.concurrent.CopyOnWriteArrayList; - -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; -import javax.naming.NamingException; - -import flex.messaging.MessageException; -import flex.messaging.log.Log; - -/** - * A JMSProxy subclass for <code>javax.jms.MessageConsumer</code> instance. - */ -public abstract class JMSConsumer extends JMSProxy implements ExceptionListener -{ - /* JMS related variables */ - protected MessageConsumer consumer; - - protected MessageReceiver messageReceiver; - protected String selectorExpression; - - // Keep track whether MessageReceiver was set manually by the user or JMSAdapter. - // or automatically instantiated so appropriate error messages can be propagated - // in the former and supressed in the latter case. - private boolean messageReceiverManuallySet = false; - - /** - * The lock to use to guard all state changes for the JMSConsumer. - */ - protected Object lock = new Object(); - - /** - * The set of JMS message listeners to notify when a JMS message arrives. - */ - private final CopyOnWriteArrayList jmsMessageListeners = new CopyOnWriteArrayList(); - - /** - * The set of JMS exception listeners to notify when a JMS exception is thrown. - */ - private final CopyOnWriteArrayList jmsExceptionListeners = new CopyOnWriteArrayList(); - - //-------------------------------------------------------------------------- - // - // Initialize, validate, start, and stop methods. - // - //-------------------------------------------------------------------------- - - /** - * Starts the <code>JMSConsumer</code>. Subclasses should call <code>super.start</code>. - * - * @throws NamingException The thrown naming exception. - * @throws JMSException The thrown JMS exception. - */ - public void start() throws NamingException, JMSException - { - super.start(); - - if (Log.isInfo()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '" - + destinationJndiName + "' is starting."); - } - - /** - * Stops the <code>JMSConsumer</code> by stopping its associated receiver - * adapter and closing the underlying <code>MessageConsumer</code>. It then - * calls <code>JMSProxy.close</code> for session and connection closure. - */ - public void stop() - { - if (Log.isInfo()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '" - + destinationJndiName + "' is stopping."); - - stopMessageReceiver(); - - try - { - if (consumer != null) - consumer.close(); - } - catch (JMSException e) - { - if (Log.isWarn()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).warn("JMS consumer for JMS destination '" - + destinationJndiName + "' received an error while closing its underlying MessageConsumer: " - + e.getMessage()); - } - - super.stop(); - } - - /** - * Stops the <code>JMSConsumer</code> and unsubscribes a durable subscription - * if one exists. By default this method delegates to <code>stop()</code> - * and doesn't remove a durable subscription. - * - * @param unsubscribe Determines whether to unsubscribe a durable subscription - * if one exists, or not. - */ - public void stop(boolean unsubscribe) - { - stop(); - } - - //-------------------------------------------------------------------------- - // - // Public Methods - // - //-------------------------------------------------------------------------- - - /** - * Adds a JMS message listener. - * - * @see flex.messaging.services.messaging.adapters.JMSMessageListener - * - * @param listener The listener to add. - */ - public void addJMSMessageListener(JMSMessageListener listener) - { - if (listener != null) - jmsMessageListeners.addIfAbsent(listener); - } - - /** - * Removes a JMS message listener. - * - * @see flex.messaging.services.messaging.adapters.JMSMessageListener - * - * @param listener The listener to remove. - */ - public void removeJMSMessageListener(JMSMessageListener listener) - { - if (listener != null) - jmsMessageListeners.remove(listener); - } - - /** - * Adds a JMS exception listener. - * - * @see flex.messaging.services.messaging.adapters.JMSExceptionListener - * - * @param listener The listener to add. - */ - public void addJMSExceptionListener(JMSExceptionListener listener) - { - if (listener != null) - jmsExceptionListeners.addIfAbsent(listener); - } - - /** - * Removes a JMS exception listener. - * - * @see flex.messaging.services.messaging.adapters.JMSExceptionListener - * - * @param listener The listener to remove. - */ - public void removeJMSExceptionListener(JMSExceptionListener listener) - { - if (listener != null) - jmsExceptionListeners.remove(listener); - } - - /** - * Sets the message listener of the underlying MessageConsumer. This method - * is not meant to be directly called as it is used internally by - * MessageReceivers that need to perform async message delivery. Any future - * custom MessageReceiver implementations can use this method to set themselves - * as MessageListeners to the underlying MessageConsumer. - * - * @param listener Message listener to set on the underlying MessageConsumer. - * @return The old message listener associated with the MessageConsumer. - * @throws JMSException The thrown JMS exception. - */ - public MessageListener setMessageListener(MessageListener listener) throws JMSException - { - MessageListener oldListener = consumer.getMessageListener(); - consumer.setMessageListener(listener); - return oldListener; - } - - /** - * Returns the <code>MessageReceiver</code> used by the consumer to retrieve - * JMS messages. - * - * @return The <code>MessageReceiver</code> used. - */ - public MessageReceiver getMessageReceiver() - { - return messageReceiver; - } - - /** - * Sets the <code>MessageReceiver</code> used by the consumer to retrieve - * JMS messages. This property should not change after startup. - * - * @param messageReceiver The <code>MessageReceiver</code> used. - */ - public void setMessageReceiver(MessageReceiver messageReceiver) - { - this.messageReceiver = messageReceiver; - messageReceiverManuallySet = true; - } - - /** - * Returns the selector expression used when the underlying - * <code>javax.jms.MessageConsumer</code> is created. - * - * @return The selector expression. - */ - public String getSelectorExpression() - { - return selectorExpression; - } - - /** - * Sets the selector expression used when the underlying - * <code>javax.jms.MessageConsumer</code> is created. This property should - * not change after startup. - * - * @param selectorExpression The selector expression. - */ - public void setSelectorExpression(String selectorExpression) - { - this.selectorExpression = selectorExpression; - } - - /** - * Implementation of javax.jms.ExceptionListener.onException. - * Dispatches the JMS exception to registered JMS exception listeners. - * - * @param exception The thrown JMS exception. - */ - public void onException(JMSException exception) - { - if (!jmsExceptionListeners.isEmpty()) - { - // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions. - for (Iterator iter = jmsExceptionListeners.iterator(); iter.hasNext();) - ((JMSExceptionListener)iter.next()).exceptionThrown(new JMSExceptionEvent(this, exception)); - } - } - - /** - * Acnowledges the receipt of the message to the JMS server and passes the - * message to registered JMS message listeners. - * - * @param jmsMessage The new JMS message to acknowledge and dispatch. - */ - public void onMessage(Message jmsMessage) - { - acknowledgeMessage(jmsMessage); - - if (!jmsMessageListeners.isEmpty()) - { - // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions. - for (Iterator iter = jmsMessageListeners.iterator(); iter.hasNext();) - ((JMSMessageListener)iter.next()).messageReceived(new JMSMessageEvent(this, jmsMessage)); - } - } - - /** - * Receive the next message from the underlying MessageConsumer or wait - * indefinetely until a message arrives if there is no message. - * - * @return The received JMS message. - * @throws JMSException The thrown JMS exception. - */ - public Message receive() throws JMSException - { - if (Log.isInfo()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).info(Thread.currentThread() - + " JMS consumer for JMS destination '" + destinationJndiName - + "' is waiting forever until a new message arrives."); - - return consumer.receive(); - } - - /** - * Receive the next message from the underlying MessageConsumer within the - * specified timeout interval. - * - * @param timeout The number of milliseconds to wait for a new message. - * @throws JMSException The thrown JMS exception. - */ - public Message receive(long timeout) throws JMSException - { - if (Log.isInfo()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).info(Thread.currentThread() - + " JMS consumer for JMS destination '" + destinationJndiName - + "' is waiting " + timeout + " ms for new message to arrive"); - - return consumer.receive(timeout); - } - - /** - * Receive the new message from the underlying MessageConsumer with no wait. - * - * @return The received JMS message. - * @throws JMSException The thrown JMS exception. - */ - public Message receiveNoWait() throws JMSException - { - return consumer.receiveNoWait(); - } - - //-------------------------------------------------------------------------- - // - // Protected and Private Methods - // - //-------------------------------------------------------------------------- - - /** - * Start the Message Receiver of the <code>JMSConsumer</code>. - * - * @throws JMSException The thrown JMS exception. - */ - void startMessageReceiver() throws JMSException - { - initializeMessageReceiver(); - messageReceiver.startReceive(); - connection.start(); - } - - /** - * Stops the Message Receiver of the <code>JMSConsumer</code>. - */ - void stopMessageReceiver() - { - if (messageReceiver != null) - messageReceiver.stopReceive(); - } - - /** - * Used internally to acknowledge the arrival of a message to the JMS server. - * - * @param message The JMS message to acknowledge. - */ - protected void acknowledgeMessage(Message message) - { - if (getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) - { - try - { - message.acknowledge(); - } - catch (JMSException e) - { - if (Log.isInfo()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '" - + destinationJndiName + "' received an error in message acknowledgement: " + e.getMessage()); - } - } - } - - /** - * Initializes the message receiver used by the <code>JMSConsumer</code>. - * If the message receiver has been manually set, it validates the message - * receiver. Otherwise, it initalizes an async message receiver if it can, - * and falls back to sync message delivery if it cannot. - * - * This method should be called by subclasses once there is an underlying - * <code>javax.jms.MessageConsumer</code>. - */ - private void initializeMessageReceiver() - { - // If an AsyncMessageReceiver is manually set, make sure the app server - // allows MessageListener and ExceptionListener for JMS. - if (messageReceiverManuallySet && messageReceiver != null) - { - if (messageReceiver instanceof AsyncMessageReceiver) - { - String restrictedMethod = null; - try - { - // Test if MessageListener is restricted. - restrictedMethod = "javax.jms.MessageConsumer.setMessageListener"; - consumer.getMessageListener(); - - // Test if ExceptionListener is restricted. - restrictedMethod = "javax.jms.Connection.setExceptionListener"; - connection.setExceptionListener((AsyncMessageReceiver)messageReceiver); - - if (Log.isInfo()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '" - + destinationJndiName +"' is using async message receiver."); - } - catch (JMSException jmsEx) - { - // JMS consumer for JMS destination ''{0}'' is configured to use async message receiver but the application server does not allow ''{1}'' call used in async message receiver. Please switch to sync message receiver. - MessageException me = new MessageException(); - me.setMessage(JMSConfigConstants.ASYNC_MESSAGE_DELIVERY_NOT_SUPPORTED, new Object[] {destinationJndiName, restrictedMethod}); - throw me; - } - } - else if (messageReceiver instanceof SyncMessageReceiver) - { - SyncMessageReceiver smr = (SyncMessageReceiver)messageReceiver; - if (Log.isInfo()) - { - Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '" - + destinationJndiName +"' is using sync message receiver" - + " with sync-receive-interval-millis: " + smr.getSyncReceiveIntervalMillis() - + ", sync-receive-wait-millis: " + smr.getSyncReceiveWaitMillis()); - } - } - } - // If no MessageReceiver was manually set, set a default MessageReceiver - // with the following strategy: First try async message delivery. If the - // app server doesn't allow it, switch to sync message delivery. - else - { - try - { - messageReceiver = new AsyncMessageReceiver(this); - - // Test if MessageListener is restricted. - consumer.getMessageListener(); - // Test if ExceptionListener is restricted. - connection.setExceptionListener((AsyncMessageReceiver)messageReceiver); - - if (Log.isInfo()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '" - + destinationJndiName +"' is using async message receiver."); - } - catch (JMSException e) - { - SyncMessageReceiver smr = new SyncMessageReceiver(this); - smr.setSyncReceiveIntervalMillis(1); - smr.setSyncReceiveWaitMillis(-1); - messageReceiver = smr; - - if (Log.isInfo()) - { - Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '" - + destinationJndiName +"' is using sync message receiver" - + " with sync-receive-interval-millis: " + smr.getSyncReceiveIntervalMillis() - + ", sync-receive-wait-millis: " + smr.getSyncReceiveWaitMillis()); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSExceptionEvent.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSExceptionEvent.java b/core/src/flex/messaging/services/messaging/adapters/JMSExceptionEvent.java deleted file mode 100644 index f809efb..0000000 --- a/core/src/flex/messaging/services/messaging/adapters/JMSExceptionEvent.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging.adapters; - -import java.util.EventObject; -import javax.jms.JMSException; - -/** - * Event dispatched to the JMSExceptionListener when a JMS exception is encountered - * by the source. - * - * @see flex.messaging.services.messaging.adapters.JMSExceptionListener - */ -public class JMSExceptionEvent extends EventObject -{ - private JMSException jmsException; - - /** - * Create a new JMSExceptionEvent with the source and exception. - * - * @param source The source of the exception. - * @param jmsException The actual JMS exception. - */ - JMSExceptionEvent(JMSConsumer source, JMSException jmsException) - { - super(source); - this.jmsException = jmsException; - } - - /** - * Return the JMS exception of the event. - * - * @return The JMS exception of the event. - */ - public JMSException getJMSException() - { - return jmsException; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSExceptionListener.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSExceptionListener.java b/core/src/flex/messaging/services/messaging/adapters/JMSExceptionListener.java deleted file mode 100644 index 438e337..0000000 --- a/core/src/flex/messaging/services/messaging/adapters/JMSExceptionListener.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging.adapters; - -import java.util.EventListener; - -/** - * An interface to be notified when a JMS exception is encountered by the JMS - * consumer. Implementations of this interface may add themselves as listeners - * via <code>JMSConsumer.addJMSExceptionListener</code>. - */ -public interface JMSExceptionListener extends EventListener -{ - /** - * Notification that a JMS exception was encountered. - * - * @param evt JMSExceptionEvent to dispatch. - */ - void exceptionThrown(JMSExceptionEvent evt); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSMessageEvent.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSMessageEvent.java b/core/src/flex/messaging/services/messaging/adapters/JMSMessageEvent.java deleted file mode 100644 index 2d583e7..0000000 --- a/core/src/flex/messaging/services/messaging/adapters/JMSMessageEvent.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging.adapters; - -import java.util.EventObject; -import javax.jms.Message; - -/** - * Event dispatched to the JMSMessageListener when a JMS message is received - * by the source. - * - * @see flex.messaging.services.messaging.adapters.JMSMessageListener - * - */ -public class JMSMessageEvent extends EventObject -{ - private Message message; - - /** - * Create a new JMSMessageEvent with the source and message. - * - * @param source The source of the message. - * @param jmsException The actual JMS message. - */ - JMSMessageEvent(JMSConsumer source, javax.jms.Message message) - { - super(source); - this.message = message; - } - - /** - * Return the JMS message of the event. - * - * @return The JMS message of the event. - */ - public Message getJMSMessage() - { - return message; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSMessageListener.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSMessageListener.java b/core/src/flex/messaging/services/messaging/adapters/JMSMessageListener.java deleted file mode 100644 index 671416d..0000000 --- a/core/src/flex/messaging/services/messaging/adapters/JMSMessageListener.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging.adapters; - -import java.util.EventListener; - -/** - * An interface to be notified when a JMS message is received by the JMS - * consumer. Implementations of this interface may add themselves as listeners - * via <code>JMSConsumer.addJMSMessageListener</code>. - */ -public interface JMSMessageListener extends EventListener -{ - /** - * Notification that a JMS message was received. - * - * @param evt JMSMessageEvent to dispatch. - */ - void messageReceived(JMSMessageEvent evt); -} \ No newline at end of file