Currently I have 2 consumers set up - which are consuming from Queues. I have
disabled producerflowcontrol, setMemoryLimit to 1, queueprefetch = 10,
topicprefetch = 10. 

I have added infinite sleep in consumer1 and I ran the load test with 900000
events. My expectation is consumer1 discarding all events and consumer2
consuming all events. However both the consumers are now blocked. Also all
the events are saved in both the queue. Each queue size is 900000.

I have pasted my code snippet below. Can some one please guide me. Thanks.

        /* configure activemq broker. */
        broker.setDeleteAllMessagesOnStartup(true);
        broker.setUseJmx(true);
        broker.setAdvisorySupport(false);
        PolicyMap policyMap = new PolicyMap();
        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
        PolicyEntry topicPolicy = new PolicyEntry();
        topicPolicy.setTopic(">");
        topicPolicy.setProducerFlowControl(false);
        entries.add(topicPolicy);

        PolicyEntry queuePolicy = new PolicyEntry();
        /* if this is true and if consumers are slow - producers will be
throttled and worst case halted */
        queuePolicy.setProducerFlowControl(false);
        /* set flow control for all topics */
        queuePolicy.setQueue(">");

        queuePolicy.setMemoryLimit(1);
        ConstantPendingMessageLimitStrategy
constantPendingMessageLimitStrategy = new
ConstantPendingMessageLimitStrategy();
        constantPendingMessageLimitStrategy.setLimit(1);

       
queuePolicy.setPendingMessageLimitStrategy(constantPendingMessageLimitStrategy);
        OldestMessageEvictionStrategy oldestMessageEvictionStrategy = new
OldestMessageEvictionStrategy();
       
oldestMessageEvictionStrategy.setEvictExpiredMessagesHighWatermark(1);
       
queuePolicy.setMessageEvictionStrategy(oldestMessageEvictionStrategy);
        /* send an advisory message if a consumer is deemed slow */
        queuePolicy.setAdvisoryForSlowConsumers(false);
        /* the period (in ms) of checks for message expiry on queued
messages, value of 0 disables */
        queuePolicy.setExpireMessagesPeriod(1000);
        /* Set the PrefetchSize for all topics. You can override this value
while creating consumer. */
        // policy.setTopicPrefetch(10);
        queuePolicy.setQueuePrefetch(10);
        entries.add(queuePolicy);
        policyMap.setPolicyEntries(entries);
        broker.setDestinationPolicy(policyMap);

        /* All undeliverable messages will be sent to ActiveMQ.DLQ which has
fixed size. If it reaches fixed size,
         * producers will be throttled. Drop dead letter queue. Enable it
case by case basis. */
        // DiscardingDLQBrokerPlugin dlqBrokerPlugin = new
DiscardingDLQBrokerPlugin();
        // dlqBrokerPlugin.setDropAll(true);
        // dlqBrokerPlugin.setDropTemporaryTopics(true);
        // dlqBrokerPlugin.setDropTemporaryQueues(true);
        // BrokerPlugin[] plugins = { dlqBrokerPlugin };
        // broker.setPlugins(plugins);

        VirtualTopic virtualTopic = new VirtualTopic();
        // the new config that enables selectors on the intercepter
        virtualTopic.setSelectorAware(true);
        VirtualDestinationInterceptor interceptor = new
VirtualDestinationInterceptor();
        interceptor.setVirtualDestinations(new VirtualDestination[] {
virtualTopic });
        broker.setDestinationInterceptors(new DestinationInterceptor[] {
interceptor });
        broker.start();




--
View this message in context: 
http://activemq.2283324.n4.nabble.com/Random-slow-Subscribers-causing-Topic-to-full-Solution-tp4664784p4664856.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to