Currently I have 2 consumers set up - which are consuming from Queues. I have disabled producerflowcontrol, setMemoryLimit to 1, queueprefetch = 10, topicprefetch = 10.
I have added infinite sleep in consumer1 and I ran the load test with 900000 events. My expectation is consumer1 discarding all events and consumer2 consuming all events. However both the consumers are now blocked. Also all the events are saved in both the queue. Each queue size is 900000. I have pasted my code snippet below. Can some one please guide me. Thanks. /* configure activemq broker. */ broker.setDeleteAllMessagesOnStartup(true); broker.setUseJmx(true); broker.setAdvisorySupport(false); PolicyMap policyMap = new PolicyMap(); List<PolicyEntry> entries = new ArrayList<PolicyEntry>(); PolicyEntry topicPolicy = new PolicyEntry(); topicPolicy.setTopic(">"); topicPolicy.setProducerFlowControl(false); entries.add(topicPolicy); PolicyEntry queuePolicy = new PolicyEntry(); /* if this is true and if consumers are slow - producers will be throttled and worst case halted */ queuePolicy.setProducerFlowControl(false); /* set flow control for all topics */ queuePolicy.setQueue(">"); queuePolicy.setMemoryLimit(1); ConstantPendingMessageLimitStrategy constantPendingMessageLimitStrategy = new ConstantPendingMessageLimitStrategy(); constantPendingMessageLimitStrategy.setLimit(1); queuePolicy.setPendingMessageLimitStrategy(constantPendingMessageLimitStrategy); OldestMessageEvictionStrategy oldestMessageEvictionStrategy = new OldestMessageEvictionStrategy(); oldestMessageEvictionStrategy.setEvictExpiredMessagesHighWatermark(1); queuePolicy.setMessageEvictionStrategy(oldestMessageEvictionStrategy); /* send an advisory message if a consumer is deemed slow */ queuePolicy.setAdvisoryForSlowConsumers(false); /* the period (in ms) of checks for message expiry on queued messages, value of 0 disables */ queuePolicy.setExpireMessagesPeriod(1000); /* Set the PrefetchSize for all topics. You can override this value while creating consumer. */ // policy.setTopicPrefetch(10); queuePolicy.setQueuePrefetch(10); entries.add(queuePolicy); policyMap.setPolicyEntries(entries); broker.setDestinationPolicy(policyMap); /* All undeliverable messages will be sent to ActiveMQ.DLQ which has fixed size. If it reaches fixed size, * producers will be throttled. Drop dead letter queue. Enable it case by case basis. */ // DiscardingDLQBrokerPlugin dlqBrokerPlugin = new DiscardingDLQBrokerPlugin(); // dlqBrokerPlugin.setDropAll(true); // dlqBrokerPlugin.setDropTemporaryTopics(true); // dlqBrokerPlugin.setDropTemporaryQueues(true); // BrokerPlugin[] plugins = { dlqBrokerPlugin }; // broker.setPlugins(plugins); VirtualTopic virtualTopic = new VirtualTopic(); // the new config that enables selectors on the intercepter virtualTopic.setSelectorAware(true); VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); interceptor.setVirtualDestinations(new VirtualDestination[] { virtualTopic }); broker.setDestinationInterceptors(new DestinationInterceptor[] { interceptor }); broker.start(); -- View this message in context: http://activemq.2283324.n4.nabble.com/Random-slow-Subscribers-causing-Topic-to-full-Solution-tp4664784p4664856.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.