[ https://issues.apache.org/jira/browse/AMQ-6863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tomas Pavelka updated AMQ-6863: ------------------------------- Description: I am using a file based cursor with durable topic subscriptions because in my tests the broker would run out of memory when dealing with large numbers of messages without an active consumer. I have run into a NullPointerException when the messages meant for the topic with an active durable subscription expire. Here is part of the stack trace: java.lang.NullPointerException: null at org.apache.activemq.broker.region.Topic.acknowledge(Topic.java:586) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.Topic.messageExpired(Topic.java:810) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.discardExpiredMessage(FilePendingMessageCursor.java:489) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.tryAddMessageLast(FilePendingMessageCursor.java:247) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor.addMessageLast(AbstractPendingMessageCursor.java:93) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:157) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:279) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.Topic$2.recoverMessage(Topic.java:314) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore$6.execute(KahaDBStore.java:1012) [activemq-kahadb-store-5.15.2.jar:5.15.2] at org.apache.activemq.store.kahadb.disk.page.Transaction.execute(Transaction.java:779) [activemq-kahadb-store-5.15.2.jar:5.15.2] at org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore.recoverSubscription(KahaDBStore.java:999) [activemq-kahadb-store-5.15.2.jar:5.15.2] at org.apache.activemq.store.ProxyTopicMessageStore.recoverSubscription(ProxyTopicMessageStore.java:108) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.Topic.activate(Topic.java:307) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:123) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.Topic.addSubscription(Topic.java:164) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.TopicRegion.addSubscriptionsForDestination(TopicRegion.java:287) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:162) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:339) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:239) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:104) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:200) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:189) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:189) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:119) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerService$6.start(BrokerService.java:2370) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:747) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:733) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerService.start(BrokerService.java:636) [activemq-broker-5.15.2.jar:5.15.2] I looked at the code and it seems to me that this is caused by the method org.apache.activemq.broker.region.cursors.FilePendingMessageCursor#discardExpiredMessage: private void discardExpiredMessage(MessageReference reference) { LOG.debug("Discarding expired message {}", reference); if (reference.isExpired() && broker.isExpired(reference)) { ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); context.setBroker(broker); ((Destination)reference.getRegionDestination()).messageExpired(context, null, new IndirectMessageReference(reference.getMessage())); } } There the subscription passed to Destination#messageExpired is set to null. If the destination is a topic, then later in org.apache.activemq.broker.region.Topic#acknowledge: @Override public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException { if (topicStore != null && node.isPersistent()) { DurableTopicSubscription dsub = (DurableTopicSubscription) sub; SubscriptionKey key = dsub.getSubscriptionKey(); topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), convertToNonRangedAck(ack, node)); } messageConsumed(context, node); } The code dsub.getSubscriptionKey() throws an NPE. was: I am using a file based cursor with durable topic subscriptions because in my tests the broker would run out of memory when dealing with large numbers of messages without an active consumer. I have run into a NullPointerException when the messages meant for the topic with an active durable subscription expire. Here is part of the stack trace: {{java.lang.NullPointerException: null at org.apache.activemq.broker.region.Topic.acknowledge(Topic.java:586) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.Topic.messageExpired(Topic.java:810) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.discardExpiredMessage(FilePendingMessageCursor.java:489) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.tryAddMessageLast(FilePendingMessageCursor.java:247) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor.addMessageLast(AbstractPendingMessageCursor.java:93) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:157) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:279) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.Topic$2.recoverMessage(Topic.java:314) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore$6.execute(KahaDBStore.java:1012) [activemq-kahadb-store-5.15.2.jar:5.15.2] at org.apache.activemq.store.kahadb.disk.page.Transaction.execute(Transaction.java:779) [activemq-kahadb-store-5.15.2.jar:5.15.2] at org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore.recoverSubscription(KahaDBStore.java:999) [activemq-kahadb-store-5.15.2.jar:5.15.2] at org.apache.activemq.store.ProxyTopicMessageStore.recoverSubscription(ProxyTopicMessageStore.java:108) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.Topic.activate(Topic.java:307) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:123) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.Topic.addSubscription(Topic.java:164) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.TopicRegion.addSubscriptionsForDestination(TopicRegion.java:287) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:162) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:339) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:239) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:104) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:200) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:189) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:189) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:119) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerService$6.start(BrokerService.java:2370) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:747) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:733) [activemq-broker-5.15.2.jar:5.15.2] at org.apache.activemq.broker.BrokerService.start(BrokerService.java:636) [activemq-broker-5.15.2.jar:5.15.2]}} I looked at the code and it seems to me that this is caused by the method org.apache.activemq.broker.region.cursors.FilePendingMessageCursor#discardExpiredMessage: private void discardExpiredMessage(MessageReference reference) { LOG.debug("Discarding expired message {}", reference); if (reference.isExpired() && broker.isExpired(reference)) { ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); context.setBroker(broker); ((Destination)reference.getRegionDestination()).messageExpired(context, null, new IndirectMessageReference(reference.getMessage())); } } There the subscription passed to Destination#messageExpired is set to null. If the destination is a topic, then later in org.apache.activemq.broker.region.Topic#acknowledge: @Override public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException { if (topicStore != null && node.isPersistent()) { DurableTopicSubscription dsub = (DurableTopicSubscription) sub; SubscriptionKey key = dsub.getSubscriptionKey(); topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), convertToNonRangedAck(ack, node)); } messageConsumed(context, node); } The code dsub.getSubscriptionKey() throws an NPE. > NPE when expiring messages with FilePendingMessageCursor and durable topic > subscriptions > ---------------------------------------------------------------------------------------- > > Key: AMQ-6863 > URL: https://issues.apache.org/jira/browse/AMQ-6863 > Project: ActiveMQ > Issue Type: Bug > Components: Broker > Affects Versions: 5.15.2 > Reporter: Tomas Pavelka > > I am using a file based cursor with durable topic subscriptions because in my > tests the broker would run out of memory when dealing with large numbers of > messages without an active consumer. > I have run into a NullPointerException when the messages meant for the topic > with an active durable subscription expire. Here is part of the stack trace: > java.lang.NullPointerException: null > at org.apache.activemq.broker.region.Topic.acknowledge(Topic.java:586) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.region.Topic.messageExpired(Topic.java:810) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.discardExpiredMessage(FilePendingMessageCursor.java:489) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.tryAddMessageLast(FilePendingMessageCursor.java:247) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor.addMessageLast(AbstractPendingMessageCursor.java:93) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:157) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:279) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.region.Topic$2.recoverMessage(Topic.java:314) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore$6.execute(KahaDBStore.java:1012) > [activemq-kahadb-store-5.15.2.jar:5.15.2] > at > org.apache.activemq.store.kahadb.disk.page.Transaction.execute(Transaction.java:779) > [activemq-kahadb-store-5.15.2.jar:5.15.2] > at > org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore.recoverSubscription(KahaDBStore.java:999) > [activemq-kahadb-store-5.15.2.jar:5.15.2] > at > org.apache.activemq.store.ProxyTopicMessageStore.recoverSubscription(ProxyTopicMessageStore.java:108) > [activemq-broker-5.15.2.jar:5.15.2] > at org.apache.activemq.broker.region.Topic.activate(Topic.java:307) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:123) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.region.Topic.addSubscription(Topic.java:164) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.region.TopicRegion.addSubscriptionsForDestination(TopicRegion.java:287) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:162) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:339) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:239) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:104) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:200) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:189) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:189) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:119) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.BrokerService$6.start(BrokerService.java:2370) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:747) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:733) > [activemq-broker-5.15.2.jar:5.15.2] > at > org.apache.activemq.broker.BrokerService.start(BrokerService.java:636) > [activemq-broker-5.15.2.jar:5.15.2] > I looked at the code and it seems to me that this is caused by the method > org.apache.activemq.broker.region.cursors.FilePendingMessageCursor#discardExpiredMessage: > private void discardExpiredMessage(MessageReference reference) { > LOG.debug("Discarding expired message {}", reference); > if (reference.isExpired() && broker.isExpired(reference)) { > ConnectionContext context = new ConnectionContext(new > NonCachedMessageEvaluationContext()); > context.setBroker(broker); > > ((Destination)reference.getRegionDestination()).messageExpired(context, > null, new IndirectMessageReference(reference.getMessage())); > } > } > There the subscription passed to Destination#messageExpired is set to null. > If the destination is a topic, then later in > org.apache.activemq.broker.region.Topic#acknowledge: > @Override > public void acknowledge(ConnectionContext context, Subscription sub, > final MessageAck ack, > final MessageReference node) throws IOException { > if (topicStore != null && node.isPersistent()) { > DurableTopicSubscription dsub = (DurableTopicSubscription) sub; > SubscriptionKey key = dsub.getSubscriptionKey(); > topicStore.acknowledge(context, key.getClientId(), > key.getSubscriptionName(), node.getMessageId(), > convertToNonRangedAck(ack, node)); > } > messageConsumed(context, node); > } > The code dsub.getSubscriptionKey() throws an NPE. -- This message was sent by Atlassian JIRA (v6.4.14#64029)