[ 
https://issues.apache.org/jira/browse/AMQ-6863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264227#comment-16264227
 ] 

Tomas Pavelka commented on AMQ-6863:
------------------------------------

Here is why we can only achieve what we want with a file cursor, despite the 
file cursor saving to disk twice:
We have a group of topics (let's call them LIMITED.TOPIC) with durable 
subscribers that we want to limit by policies. For example, we set a policy 
that these topics can only use 70% of storage so that other destinations can 
still use the broker even if LIMITED.TOPIC fills up storage up to the limit 
allowed by policy.
We can run into a situation, where a durable consumer disconnect. We are using 
a hub and spoke topology, so eventually when the consumer is disconnected for 
long enough the storage for both hub and the sending broker fills up, here is a 
picture:

spoke_broker(LIMITED.TOPIC:full) -> hub_broker(LIMITED.TOPIC:full)

In this situation, hub_broker has an inactive durable subscription from the 
disconnected consumer, but spoke_broker has an active subscription from 
hub_broker. In this situation, spoke_broker will put messages from 
LIMITED.TOPIC into memory *up to the global limit* (as opposed to policy set 
high watermark). 
This results in a situation where spoke_broker cannot process any other 
messages because its memory is full.
The only way we were able to solve this is by using a file cursor which (I 
guess) respects policy watermarks. But the file cursor has the bug with message 
expiration.

> NPE when expiring messages with FilePendingMessageCursor and durable topic 
> subscriptions
> ----------------------------------------------------------------------------------------
>
>                 Key: AMQ-6863
>                 URL: https://issues.apache.org/jira/browse/AMQ-6863
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.15.2
>            Reporter: Tomas Pavelka
>         Attachments: TestCursors.java
>
>
> I am using a file based cursor with durable topic subscriptions because in my
> tests the broker would run out of memory when dealing with large numbers of
> messages without an active consumer.
> I have run into a NullPointerException when the messages meant for the topic
> with an active durable subscription expire. Here is part of the stack trace:
> java.lang.NullPointerException: null
>         at org.apache.activemq.broker.region.Topic.acknowledge(Topic.java:586)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at 
> org.apache.activemq.broker.region.Topic.messageExpired(Topic.java:810)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.discardExpiredMessage(FilePendingMessageCursor.java:489)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.tryAddMessageLast(FilePendingMessageCursor.java:247)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor.addMessageLast(AbstractPendingMessageCursor.java:93)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:157)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:279)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at 
> org.apache.activemq.broker.region.Topic$2.recoverMessage(Topic.java:314)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore$6.execute(KahaDBStore.java:1012)
> [activemq-kahadb-store-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.store.kahadb.disk.page.Transaction.execute(Transaction.java:779)
> [activemq-kahadb-store-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore.recoverSubscription(KahaDBStore.java:999)
> [activemq-kahadb-store-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.store.ProxyTopicMessageStore.recoverSubscription(ProxyTopicMessageStore.java:108)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at org.apache.activemq.broker.region.Topic.activate(Topic.java:307)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:123)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at 
> org.apache.activemq.broker.region.Topic.addSubscription(Topic.java:164)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.TopicRegion.addSubscriptionsForDestination(TopicRegion.java:287)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:162)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:339)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:239)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:104)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:200)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at 
> org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:189)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at 
> org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:189)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:119)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerService$6.start(BrokerService.java:2370)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:747)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:733)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at 
> org.apache.activemq.broker.BrokerService.start(BrokerService.java:636)
> [activemq-broker-5.15.2.jar:5.15.2]
> I looked at the code and it seems to me that this is caused by the method
> org.apache.activemq.broker.region.cursors.FilePendingMessageCursor#discardExpiredMessage:
>    private void discardExpiredMessage(MessageReference reference) {
>         LOG.debug("Discarding expired message {}", reference);
>         if (reference.isExpired() && broker.isExpired(reference)) {
>             ConnectionContext context = new ConnectionContext(new
> NonCachedMessageEvaluationContext());
>             context.setBroker(broker);
>            
> ((Destination)reference.getRegionDestination()).messageExpired(context,
> null, new IndirectMessageReference(reference.getMessage()));
>         }
>     }
> There the subscription passed to Destination#messageExpired is set to null.
> If the destination is a topic, then later in
> org.apache.activemq.broker.region.Topic#acknowledge:
>     @Override
>     public void acknowledge(ConnectionContext context, Subscription sub,
> final MessageAck ack,
>             final MessageReference node) throws IOException {
>         if (topicStore != null && node.isPersistent()) {
>             DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
>             SubscriptionKey key = dsub.getSubscriptionKey();
>             topicStore.acknowledge(context, key.getClientId(),
> key.getSubscriptionName(), node.getMessageId(),
>                     convertToNonRangedAck(ack, node));
>         }
>         messageConsumed(context, node);
>     }
> The code dsub.getSubscriptionKey() throws an NPE. 
> I suspect that this problem also applies to the default 
> org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor which 
> uses org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor 
> internally but so far I was not able to reproduce it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to