I'm seeing a weird issue in an AMQ installation. 

basically I can see with TCP dump that a JMS message gets published to
ActiveMQ, but if that message is a particular type of message, ActiveMQ
doesn't forward that on to it's (STOMP) subscribers. 

AMQ version: 5.5.1
Setup: 
Glassfish v3.1.1, connected via activemq-rar-5.5.1

message is published via java code using injected connectionFactory. In all
cases I see the number of messages counter on the activemq admin web ui
increase, but in some cases (if attribute "smileMessageType" =
"SubscriptionClosed") the message does not become available to STOMP
subscribers, and future messages also do not get delivered to STOMP
subscribers. I've been unable to test if JMS subscribers get the message
because I can't get it working :-)

Can anyone offer any advice on what might be causing the issue? is it likely
to be something I'm doing wrong or a configuration error?

thanks in advance for any help :-D

here's my publisher code:

##### SubscriptionMsgBridgeListener.java #####

package com.inomial.smile.msg.listeners;

import java.io.StringWriter;
import java.util.Enumeration;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.ejb.MessageDriven;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;

import com.inomial.smile.schema.v2.ObjectFactory;
import com.inomial.smile.schema.v2.SubscriptionBodyType;
import com.inomial.smile.schema.v2.SubscriptionFeaturesType;
import com.inomial.smile.schema.v2.SubscriptionProvisionRequestType;
import com.inomial.smile.schema.v2.SubscriptionProvisionResponseType;

@MessageDriven(
    mappedName = "jms/SmileSubscriptionTopic"
)
public class SubscriptionMsgBridgeListener implements MessageListener
{
  private static Logger LOG =
Logger.getLogger(SubscriptionMsgBridgeListener.class.getName());
  
  @Resource(mappedName="jms/ExternalBroker")
  private ConnectionFactory connectionFactory;
  
  @Resource(mappedName="jms/ExternalSmileSubscriptionTopic")
  private Topic topic;
  
  // lazily loaded connection
  private Connection connection;

  @PreDestroy
  void cleanUp()
  {
    if (connection != null)
      try
      {
        connection.close();
      } catch (JMSException e)
      {
        LOG.log(Level.WARNING, "Unable to close connection in cleanup", e);
      }
    connection = null;
  }

  @SuppressWarnings("unchecked") /* JMS Message.getPropertyNames()
necessitates this */
  @Override
  public void onMessage(Message msg)
  {
    System.out.println("SubMsgBridge is working");
    Connection c = null;
    Session session = null;
    
    try
    {
      c = getConnection();
      session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);

      ObjectMessage messageIn = (ObjectMessage) msg;
      TextMessage messageOut = session.createTextMessage();

      /* copy all properties between messages */
      for (Enumeration<String> e = messageIn.getPropertyNames();
e.hasMoreElements() ;)
      {
        String propertyName = e.nextElement();
        messageOut.setStringProperty(propertyName,
messageIn.getStringProperty(propertyName));
      }

      /* If there is a subscription object, marshall it as XML and set it as
the body */
      addBody(messageIn, messageOut);

      session.createProducer(topic).send(messageOut);
      
//      if (LOG.isLoggable(Level.FINE))
//      {
        logMessageSent(messageOut);
//      }
      System.out.println("SubMsgBridge is working");
    } 
    catch (JMSException e)
    {
      throw new RuntimeException("Error reading subscription object", e);
    } 
    catch (JAXBException e)
    {
      throw new RuntimeException("Error serializing XML", e);
    }
    finally
    {
      try
      {
        if (session != null)      
          session.close();
      } catch (JMSException e)
      {
        // Not much we can do about this!
        e.printStackTrace();
      }
    }
 }

  private void addBody(ObjectMessage messageIn, TextMessage messageOut)
throws JMSException, JAXBException
  {
    Object body = messageIn.getObject();
    
    if (body == null)
      return;
    
    StringWriter stringWriter = new StringWriter();      
    JAXBContext jaxbContext =
JAXBContext.newInstance("com.inomial.smile.schema.v2");
    Marshaller marshaller = jaxbContext.createMarshaller();
    if (body instanceof SubscriptionProvisionRequestType)
    {
      marshaller.marshal(new
ObjectFactory().createSubscriptionProvisionStatusUpdate((SubscriptionProvisionRequestType)body),
stringWriter);
    } 
    else if (body instanceof SubscriptionFeaturesType)
    {
      marshaller.marshal(new
ObjectFactory().createSubscriptionFeaturesUpdate((SubscriptionFeaturesType)
body), stringWriter);
    }
    else if (body instanceof SubscriptionBodyType)
    {
      marshaller.marshal(new
ObjectFactory().createSubscriptionUpdated((SubscriptionBodyType) body),
stringWriter);
    }
    else if (body instanceof SubscriptionProvisionResponseType)
    {
      marshaller.marshal(new
ObjectFactory().createSubscriptionProvisionResponse((SubscriptionProvisionResponseType)
body), stringWriter);
    }
      
    messageOut.setText(stringWriter.toString());
  }
  
  private void logMessageSent(TextMessage message) throws JMSException
  {
    StringBuilder builder = new StringBuilder();
    builder.append("Sent External Message: ");
    for (Enumeration<String> e = message.getPropertyNames();
e.hasMoreElements() ;)
    {
      String name = e.nextElement();
      builder.append(name).append(" =
").append(message.getStringProperty(name)).append("\n");
    }
    builder.append("body:\n").append(message.getText());
    
    LOG.log(Level.INFO, builder.toString());
  }

  private Connection getConnection() throws JMSException
  {
    if (connection == null)
      connection = connectionFactory.createConnection();
    return connection;
  }

}

############

Cheers, 

-- David

--
View this message in context: 
http://activemq.2283324.n4.nabble.com/JMS-goes-in-sometimes-STOMP-comes-out-but-not-always-tp4416098p4416098.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to