Message getting stuck on queue, leading to KahaDB log files not being deleted 
and disk running out of space
-----------------------------------------------------------------------------------------------------------

                 Key: AMQ-2955
                 URL: https://issues.apache.org/activemq/browse/AMQ-2955
             Project: ActiveMQ
          Issue Type: Bug
          Components: Message Store
    Affects Versions: 5.4.1
         Environment: Red Hat Enterprise Linux 5
            Reporter: Peter Blackburn
            Priority: Critical


Using the following test client, we see a single message getting stuck on the 
queue. This then prevents the KahaDB files from being cleaned up. Once this 
message gets stuck, we need to restart the broker before it can be consumed. 
This is a total show stopper for us, as when this occurs in our system the 
large number of messages that we produce and consume each second causes the 
disk to run out of space within the space of an hour. We also see the same 
behaviour using synchronous sending and without failover.

This doesn't happen every time with the test client - the most reliable way I 
have found to reproduce it is to start the broker and wait for the first 
MessageDatabase checkpoint to finish before starting the test client. 

{code:title=Test Client}
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.util.Random;

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Test {
        public static final void main(String[] args) throws Exception {
                ConnectionFactory cf = new 
ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?jms.useAsyncSend=true&trackMessages=true");
                final Connection producerConn = cf.createConnection();
                final Connection consumerConn = cf.createConnection();

                final BufferedWriter producerLog = new BufferedWriter(new 
FileWriter("produced.log"));
                final BufferedWriter consumerLog = new BufferedWriter(new 
FileWriter("consumed.log"));

                new Thread(new Runnable() {
                        public void run() {
                                try {
                                        producerConn.start();
                                        Session session = 
producerConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                                        Queue queue = 
session.createQueue("TEST_QUEUE");
                                        MessageProducer producer = 
session.createProducer(queue);
                                        Random random = new Random();
                                        byte[] messageBytes = new byte[1024];

                                        for (int i = 0; i < 100000; i++) {
                                        //while (true) {
                                                random.nextBytes(messageBytes);
                                                Message message = 
session.createObjectMessage(messageBytes);
                                                producer.send(message);
                                                
producerLog.write(message.getJMSMessageID());
                                                producerLog.newLine();
                                                producerLog.flush();
                                        }
                                        System.out.println("Produced 100000 
messages...");
                                        producerLog.close();
                                }
                                catch (Exception e) {
                                        e.printStackTrace();
                                }
                        }
                }).start();

                System.out.println("Started producer...");

                new Thread(new Runnable() {
                        public void run() {
                                try {
                                        consumerConn.start();
                                        Session session = 
consumerConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                                        Queue queue = 
session.createQueue("TEST_QUEUE");
                                        MessageConsumer consumer = 
session.createConsumer(queue);
                                        consumer.setMessageListener(new 
MessageListener() {
                                                public void onMessage(Message 
message) {
                                                        try {
                                                                
message.acknowledge();
                                                                
consumerLog.write(message.getJMSMessageID());
                                                                
consumerLog.newLine();
                                                                
consumerLog.flush();
                                                        }
                                                        catch (Exception e) {
                                                                
e.printStackTrace();
                                                        }
                                                }
                                        });
                                }
                                catch (Exception e) {
                                        e.printStackTrace();
                                }
                        }
                }).start();

                System.out.println("Started consumer...");
        }
}
{code}

After the 100,000 messages have been produced, we can see the following 
difference in the log files:

{noformat}
[pblackb...@xxxx test]$ diff produced.log consumed.log
10394d10393
< ID:xxxx-35451-1285948546531-0:0:1:1:10394
[pblackb...@xxxx test]$
{noformat}

Looking in the activemq log file, at around this point we see:

{noformat}
2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, 
pagedInMessages.size 349, enqueueSize: 10390
2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, 
pagedInMessages.size 350, enqueueSize: 10391
2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, 
pagedInMessages.size 351, enqueueSize: 10392
2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, 
pagedInMessages.size 352, enqueueSize: 10393
2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: usage 
change from: 69% of available memory, to: 70% of available memory
2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: usage 
change from: 70% of available memory, to: 69% of available memory
2010-10-01 15:55:51 AbstractStoreCursor [DEBUG] TEST_QUEUE disabling cache on 
size:0, lastCachedIdSeq: 10398 current node seqId: 10399
2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: usage 
change from: 69% of available memory, to: 70% of available memory
2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 2, Inflight: 353, 
pagedInMessages.size 353, enqueueSize: 10395
2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: usage 
change from: 70% of available memory, to: 69% of available memory
2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: usage 
change from: 69% of available memory, to: 70% of available memory
{noformat}

At the end of the log file, where we have a single message stuck on the queue, 
we see:

{noformat}
2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, 
pagedInMessages.size 0, enqueueSize: 100000
2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, 
pagedInMessages.size 0, enqueueSize: 100000
2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, 
pagedInMessages.size 0, enqueueSize: 100000
2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, 
pagedInMessages.size 0, enqueueSize: 100000
2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, 
pagedInMessages.size 0, enqueueSize: 100000
2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, 
pagedInMessages.size 0, enqueueSize: 100000
{noformat}

We can see the checkpoint failing to clean up the log files:

{noformat}
2010-10-01 15:56:36 MessageDatabase [DEBUG] Checkpoint started.
2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 2 as 
contained ack(s) refer to referenced file: [1, 2]
2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 3 as 
contained ack(s) refer to referenced file: [2, 3]
2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 4 as 
contained ack(s) refer to referenced file: [3, 4]
2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 5 as 
contained ack(s) refer to referenced file: [4, 5]
2010-10-01 15:56:36 MessageDatabase [DEBUG] Checkpoint done.
{noformat}

At this point our consumer had consumed all of the messages except the single 
stuck message.

We are using a clean out of the box set up - we have made no changes to the 
default activemq.xml file,



-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to