Broker config: 

  <amq:broker persistent="true" brokerName="${activemq.broker.name}"
        useJmx="true" offlineDurableSubscriberTimeout="172800000"
        offlineDurableSubscriberTaskSchedule="3600000"
advisorySupport="false">

        <amq:persistenceAdapter>
            <amq:replicatedLevelDB
                directory="${activemq.data.dir}"
                replicas="1"
                bind="tcp://0.0.0.0:0"
                zkAddress="zk1.x.x.x:2181,zk2.x.x.x:2181,zk3.x.x.x:2181"
                zkPath="${activemq.zkpath}"
                zkSessionTimeout="35s"
                sync="quorum_mem"
         />

        <amq:plugins>
            <amq:discardingDLQBrokerPlugin dropAll="true"/>
            <amq:loggingBrokerPlugin logAll="false"
logConnectionEvents="true"
                logConsumerEvents="false" logProducerEvents="false"/>
        </amq:plugins>

        <amq:transportConnectors>
            <amq:transportConnector name="mqbrokerTcpNioTransport" 
                  
uri="nio://0.0.0.0:61616?wireFormat.maxInactivityDuration=120000"
                   allowLinkStealing="true" />
        </amq:transportConnectors>


Consumer code: 


public class ActiveMQReceiver implements MessageListener
{
    private static final Logger LOG =
LoggerFactory.getLogger(ActiveMQReceiver.class);
    public void start()
    {
        try {
            ConnectionFactory connectionFactory =
                    new
ActiveMQConnectionFactory("failover://(tcp://mqbroker:61616)");
            Connection myConnection = connectionFactory.createConnection();
            myConnection.setClientID("C-1");
            myConnection.start();
            Session mySession = myConnection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            Destination destination = mySession.createTopic("Test");
            MessageConsumer consumer =
mySession.createDurableSubscriber((Topic)destination, "C-1");
            consumer.setMessageListener(this);
        }   
        catch(Exception e ) {
            LOG.warn("Exception :\\n" + e.getMessage() + " \n Cause"  +
e.getCause()
                + "\nException = " + e);
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args)
    {
        ActiveMQReceiver poller = new ActiveMQReceiver();
        poller.start();
    }

    @Override
    public void onMessage(Message arg0) 
    {       
    }
}



--
View this message in context: 
http://activemq.2283324.n4.nabble.com/AllowLinkStealing-does-not-work-for-TCP-transport-with-leveldb-persistent-store-tp4709667p4709778.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to