Hello, 

That should certainly work alright. Not sure why the msgs aren't dequeued from 
the consumer broker the first time you connect your consumer.
Was wondering if the consumer acks the msgs but it uses AUTO_ACK, so acking 
should occur.

I took your two broker configs and fired them up here locally in my env. Then 
sent a msgs to a test queue on the transmit broker. The msg got enqueued on 
this broker. 
Only when I started a consumer on the receive broker, was the msg forwarded to 
the receive broker, from where it got consumed correctly. 
All JMS counters were updated accordingly. Restarting the consumer did not 
redeliver the msg. As expected.

That made me check your client code once more and indeed there seems to be a 
problem. You call

> this.session = this.connection.createSession(true,
> Session.AUTO_ACKNOWLEDGE);

The first argument you pass in is whether to use a transacted session or not. 
You are creating a transacted session but you don't seem to commit the 
transaction anywhere in your code. 
Can you change the first argument to false and try again? The msg should now be 
consumed only once.
If you want to use transactions, then you need to manually commit the tx 
somewhere in your code. 


Hope that gets you going.


Torsten Mielke
[email protected]
[email protected]


On Oct 25, 2011, at 4:18 PM, kureckam wrote:

> I have two activemq brokers networked together. The producer broker (has
> static network xml tag) shows enqueued and dequeued values matching when
> consumer broker consumes the message, but the dequeued value on the consumer
> broker shows zero and if I rerun the consumer it receives all the messages
> again. Below is all the code I'm using to test this. Why is the consumer
> queue not removing the message from the queue?
> 
> // Producer activemq.xml
> <beans xmlns="http://www.springframework.org/schema/beans";
>  xmlns:amq="http://activemq.apache.org/schema/core";
>  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>  xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>  http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd";>
>    <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>        <property name="locations">
>            <value>file:${activemq.base}/conf/credentials.properties</value>
>        </property>      
>    </bean>
>    <broker xmlns="http://activemq.apache.org/schema/core";
> brokerName="transmitBroker" dataDirectory="${activemq.base}/data"
> destroyApplicationContextOnStop="true">
>        <destinationPolicy>
>            <policyMap>
>              <policyEntries>
>                <policyEntry topic=">" producerFlowControl="true"
> memoryLimit="1mb">
>                  <pendingSubscriberPolicy>
>                    <vmCursor />
>                  </pendingSubscriberPolicy>
>                </policyEntry>
>                <policyEntry queue=">" producerFlowControl="true"
> memoryLimit="1mb">
>                </policyEntry>
>              </policyEntries>
>            </policyMap>
>        </destinationPolicy> 
>        <managementContext>
>            <managementContext jmxDomainName="transmitDomainName"
> connectorPort="1098" />
>        </managementContext>
> 
>        <networkConnectors>
>           <networkConnector uri="static:(tcp://xx.xxx.x.xx:61619)"
> duplex="true" />
>        </networkConnectors>
>        <persistenceAdapter>
>            <kahaDB directory="${activemq.base}/data/transmit"/>
>        </persistenceAdapter>
>        <transportConnectors>
>           <transportConnector name="openwire" uri="tcp://0.0.0.0:61618"/>
>        </transportConnectors>
>    </broker>
>    <import resource="jetty.xml"/>
> </beans>
> 
> // MsgSenderTest.java
> import javax.jms.Connection;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageProducer;
> import javax.jms.Queue;
> import javax.jms.Session;
> 
> import org.apache.activemq.ActiveMQConnectionFactory;
> 
> public class MsgSenderTest
> {
>   public static void main(final String[] args_)   {
>      if(args_.length != 4)      {
>         System.out.println("Required parameters;IP, port, Test number and
> number of messages");
>         System.exit(0);
>      }
> 
>      final ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory("tcp://" + args_[0] + ":" + args_[1]);
> 
>      System.out.println("Connecting to ActiveMQ:" +
> connectionFactory.getBrokerURL());
> 
>      Connection connection = null;
>      Session startTopicSession = null;
>      MessageProducer startProducer = null;
> 
>      try      {
>         final int numberOfMessages = Integer.parseInt(args_[3]);
> 
>         connection = connectionFactory.createConnection();
>         connection.start();
> 
>         startTopicSession = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
> 
>         final Queue startQueue = startTopicSession.createQueue("Test" +
> args_[2]);
> 
>         startProducer = startTopicSession.createProducer(startQueue);
> 
>         for(int i = 0; i < numberOfMessages; i++)
>         {
>            System.out.println("Sending message #" + (i + 1));
>            startProducer.send(startTopicSession.createMessage());
> 
>            try            {               Thread.sleep(500);            }
>            catch(final Exception e2)            {}
>         }
> 
>         final Message message = startTopicSession.createMessage();
> 
>         message.setStringProperty("END", "");
>         startProducer.send(message);
>      }
>      catch(final Exception e)      {         e.printStackTrace();      }
>      finally
>      {
>         if(startProducer != null)
>         {
>            try            {               startProducer.close();           
> }
>            catch(final JMSException e)            {              
> e.printStackTrace();            }
>         }
> 
>         if(startTopicSession != null)
>         {
>            try            {               startTopicSession.close();          
>  
> }
>            catch(final JMSException e)            {              
> e.printStackTrace();            }
>         }
> 
>         if(connection != null)
>         {
>            try            {               connection.close();            }
>            catch(final JMSException e)            {              
> e.printStackTrace();            }
>         }
>      }
>   }
> }
> 
> // Consumer activemq.xml
> <beans xmlns="http://www.springframework.org/schema/beans";
>  xmlns:amq="http://activemq.apache.org/schema/core";
>  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>  xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>  http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd";>
>    <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>        <property name="locations">
>            <value>file:${activemq.base}/conf/credentials.properties</value>
>        </property>      
>    </bean>
> 
>    <broker xmlns="http://activemq.apache.org/schema/core";
> brokerName="receiveBroker" dataDirectory="${activemq.base}/data"
> destroyApplicationContextOnStop="true">
>        <destinationPolicy>
>            <policyMap>
>              <policyEntries>
>                <policyEntry topic=">" producerFlowControl="true"
> memoryLimit="1mb">
>                  <pendingSubscriberPolicy>
>                    <vmCursor />
>                  </pendingSubscriberPolicy>
>                </policyEntry>
>                <policyEntry queue=">" producerFlowControl="true"
> memoryLimit="1mb">
>                </policyEntry>
>              </policyEntries>
>            </policyMap>
>        </destinationPolicy> 
> 
>        <managementContext>
>            <managementContext jmxDomainName="receiveDomainName"
> connectorPort="1099" />
>        </managementContext>
> 
>         <networkConnectors>
>         </networkConnectors>
> 
>        <persistenceAdapter>
>            <kahaDB directory="${activemq.base}/data/receive"/>
>        </persistenceAdapter>
> 
>        <transportConnectors>
>           <transportConnector name="openwire" uri="tcp://0.0.0.0:61619"/>
>        </transportConnectors>
>    </broker>
>    <import resource="jetty.xml"/>
> </beans>
> 
> // MsgListenerTest.java
> import javax.jms.Connection;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.Session;
> 
> import org.apache.activemq.ActiveMQConnectionFactory;
> 
> public class MsgListenerTest implements MessageListener
> {
>   private final ActiveMQConnectionFactory connectionFactory;
>   private Connection connection;
>   private Session session;
>   private MessageConsumer consumer;
> 
>   private int msgNumber = 1;
> 
>   public static void main(final String[] args_)
>   {
>      if(args_.length != 3){
>         System.out.println("Required parameters: IP, port, test number");
>         System.exit(0);
>      }
> 
>      new MsgListenerTest(args_);
>   }
> 
>   public MsgListenerTest(final String[] args_){
>      this.connectionFactory = new ActiveMQConnectionFactory("tcp://" +
> args_[0] + ":" + args_[1]);
> 
>      final String queueName = "Test" + args_[2];
> 
>      try
>      {
>         this.connection = this.connectionFactory.createConnection();
>         this.connection.start();
> 
>         this.session = this.connection.createSession(true,
> Session.AUTO_ACKNOWLEDGE);
> 
>         this.consumer =
> this.session.createConsumer(this.session.createQueue(queueName));
> 
>         this.consumer.setMessageListener(this);
>      }
>      catch(final Exception e){e.printStackTrace();}
>   }
> 
>   @Override
>   public void onMessage(final Message message_)
>   {
>      try
>      {
>         if(message_.getStringProperty("END") == null)
>         {
>            System.out.println("Received messaage #" + this.msgNumber);
> 
>            this.msgNumber++;
>         }
>         else
>         {
>            if(this.consumer != null)
>            {
>               try{this.consumer.close();}
>               catch(final JMSException e){e.printStackTrace();}
>            }
> 
>            if(this.session != null)
>            {
>               try{this.session.close();}
>               catch(final JMSException e){e.printStackTrace();}
>            }
> 
>            if(this.connection != null)
>            {
>               try{this.connection.close();}
>               catch(final JMSException e) {e.printStackTrace();}
>            }
>         }
>      }
>      catch(final JMSException e2){e2.printStackTrace();}
>   }
> }
> 
> --
> View this message in context: 
> http://activemq.2283324.n4.nabble.com/Messages-not-being-removed-from-networked-queue-tp3936864p3936864.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.




Reply via email to