Pero Atanasov created AMQ-5493:
----------------------------------

             Summary: KahaDB MessageDatabase race condition while stopping the 
broker and cleaning up
                 Key: AMQ-5493
                 URL: https://issues.apache.org/jira/browse/AMQ-5493
             Project: ActiveMQ
          Issue Type: Bug
          Components: Broker
    Affects Versions: 5.10.0
            Reporter: Pero Atanasov


This issue was seen on the surface via the following broker INFO log message:

2014-12-10 16:21:58,842 | INFO  | KahaDB: Recovering checkpoint thread after 
death | org.apache.activemq.store.kahadb.MessageDatabase | Thread-26

This means that the checkpoint thread is being revived unnecessarily while
stopping the broker. It could even happen that it is revived multiple times 
before the closing process is completed.

To show the flow of the race condition, consider the following blocks of code 
from 
activemq/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java:

Lines 425 - 442 [in close()]

if( opened.compareAndSet(true, false)) {
     ... some code ...
          if (metadata.page != null) {
               checkpointUpdate(true);
          }
     ... some code ...
}

Lines 1499 - 1501 [in checkpointUpdate(boolean)]

public void execute(Transaction tx) throws IOException {
     checkpointUpdate(tx, cleanup);
}

Line 1524 [in checkpointUpdate(Transaction, boolean)]

metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();

Line 1735 [in checkpointAckMessageFileMap()]

Location location = store(new 
KahaAckMessageFileMapCommand().setAckMessageFileMap(new 
Buffer(baos.toByteArray())), nullCompletionCallback);

Lines 993 - 995 [in store(...)]

if (checkpointThread != null && !checkpointThread.isAlive()) {
     startCheckpoint();
}

Lines 332  - 372  [In startCheckpoint()]
if (checkpointThread == null) {
     start = true;
} else if (!checkpointThread.isAlive()) {
     start = true;
      LOG.info("KahaDB: Recovering checkpoint thread after death");
}

if (start) {
     checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
          @Override
          public void run() {
               try {
                     ... some code ...
                     while (opened.get()) {
                          ... some code ...
                     }
                } catch (InterruptedException e) {
                            // Looks like someone really wants us to exit this 
thread...
                } catch (IOException ioe) {
                            LOG.error("Checkpoint failed", ioe);
                            brokerService.handleIOException(ioe);
               }
          }
     };

     checkpointThread.setDaemon(true);
     checkpointThread.start();
}

Here is the sequence of events, involving code above that shows the race:

1. in close() set "opened" to false
2. in startCheckpoint() "opened.get()" is false so checkpoint thread exits run 
method and makes checkpointThread.isAlive() false
3. in store(...) calling startCheckpoint() which will revive the checkpoint 
thread again

Added some INFO logs to confirm the order above:

2014-12-17 13:27:43,678 | INFO  | patanasov: from unload() calling close() | 
org.apache.activemq.store.kahadb.MessageDatabase | Thread-18
2014-12-17 13:27:43,678 | INFO  | patanasov: close(): set opened to false; 
calling checkpointUpdate(true) | 
org.apache.activemq.store.kahadb.MessageDatabase | Thread-18
2014-12-17 13:27:43,678 | INFO  | patanasov: startCheckpoint(): 
checkpointThread exiting its run method | 
org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint 
Worker
2014-12-17 13:27:43,681 | INFO  | patanasov: from store() calling 
startCheckpoint() | org.apache.activemq.store.kahadb.MessageDatabase | Thread-18
2014-12-17 13:27:43,682 | INFO  | KahaDB: Recovering checkpoint thread after 
death | org.apache.activemq.store.kahadb.MessageDatabase | Thread-18

Based on my limited understanding of this code, this does not seem to have any 
serious negative impacts, but it would be nice to be looked at by the 
community.   

It does not seem to make sense to revive the thread after opened.get() became 
false because the checkpointThread will not do anything anyway due to 
opened.get() being false at that point. Consider the body of the 
checkpointThread:

MessageDatabase.java, Lines 348 - 359

while (opened.get()){
     ... code that will attempt updates ...
}

However, since opened.get() is false, this will not enter the while loop and 
hence the checkpointThread will exit its run quickly again.

A possible fix for this is the following

Lines 993 - 995 [in store(...)]

change

if (checkpointThread != null && !checkpointThread.isAlive()) {
     startCheckpoint();
}

to

if (checkpointThread != null && !checkpointThread.isAlive() && opened.get()) {
     startCheckpoint();
}

If opened.get() is false, then we must be closing and we will not revive the 
thread. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to