Author: chirino
Date: Wed Nov 12 08:31:48 2008
New Revision: 713419

URL: http://svn.apache.org/viewvc?rev=713419&view=rev
Log:
Fixed a couple of bugs which were cropping up in the perf test.

Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=713419&r1=713418&r2=713419&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java 
(original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java 
Wed Nov 12 08:31:48 2008
@@ -209,7 +209,6 @@
         }
         DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
         try {
-            reader.readLocationDetails(location);
             while (reader.readLocationDetailsAndValidate(location)) {
                 location.setOffset(location.getOffset() + location.getSize());
             }
@@ -426,6 +425,7 @@
                if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() 
) {
                        // It's an update to the current log file..
                        dataFile = dataFiles.getTail();
+                       dataFile.incrementLength(length);
                } else if( dataFiles.getTail().getDataFileId()+1 == 
loc.getDataFileId() ) {
                        // It's an update to the next log file.
             int nextNum = loc.getDataFileId();
@@ -438,8 +438,6 @@
                } else {
                        throw new IOException("Invalid external append.");
                }
-
-               dataFile.incrementLength(length);
        }
 
     public synchronized Location getNextLocation(Location location) throws 
IOException, IllegalStateException {
@@ -484,7 +482,7 @@
             // Load in location size and type.
             DataFileAccessor reader = 
accessorPool.openDataFileAccessor(dataFile);
             try {
-                reader.readLocationDetails(cur);
+                               reader.readLocationDetails(cur);
             } finally {
                 accessorPool.closeDataFileAccessor(reader);
             }

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=713419&r1=713418&r2=713419&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
 Wed Nov 12 08:31:48 2008
@@ -24,7 +24,6 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -108,6 +107,7 @@
        }
 
        public void onClusterChange(ClusterState config) {
+               // For now, we don't really care about changes in the slave 
config..
        }
 
 
@@ -115,7 +115,6 @@
         * This is called by the Journal so that we can replicate the update to 
the 
         * slaves.
         */
-       @Override
        public void replicate(Location location, ByteSequence sequence, boolean 
sync) {
                if( sessions.isEmpty() ) 
                        return;
@@ -259,7 +258,7 @@
                                        for (DataFile df : 
journalFiles.values()) {
                                                // Look at what the slave has 
so that only the missing bits are transfered.
                                                String name = "journal-" + 
df.getDataFileId();
-                                               PBFileInfo slaveInfo = 
slaveFiles.get(name);
+                                               PBFileInfo slaveInfo = 
slaveFiles.remove(name);
                                                
                                                // Use the checksum info to see 
if the slave has the file already.. Checksums are less acurrate for
                                                // small amounts of data.. so 
ignore small files.
@@ -292,6 +291,13 @@
                                        snapshotInfos.add(info);
                                        
                                        
rcPayload.setCopyFilesList(snapshotInfos);
+                                       ArrayList<String> deleteFiles = new 
ArrayList<String>();
+                                       slaveFiles.remove("database");
+                                       for (PBFileInfo unused : 
slaveFiles.values()) {
+                                               
deleteFiles.add(unused.getName());
+                                       }
+                                       
rcPayload.setDeleteFilesList(deleteFiles);
+                                       
                                        
                                        updateJournalReplicatedFiles();
                                }

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java?rev=713419&r1=713418&r2=713419&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
 Wed Nov 12 08:31:48 2008
@@ -118,7 +118,6 @@
                                        // If the slave service was not yet 
started.. start it up.
                                        if (slave == null) {
                                                LOG.info("Starting replication 
slave.");
-                                               store.open();
                                                slave = new 
ReplicationSlave(this);
                                                slave.start();
                                        }

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=713419&r1=713418&r2=713419&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
 Wed Nov 12 08:31:48 2008
@@ -25,6 +25,7 @@
 import java.io.OutputStream;
 import java.io.RandomAccessFile;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -59,90 +60,164 @@
        private final ReplicationServer replicationServer;
        private Transport transport;
 
+       // Used to bulk transfer the master state over to the slave..
+       private final Object transferMutex = new Object();
+       private final LinkedList<PBFileInfo> transferQueue = new 
LinkedList<PBFileInfo>();
+       private final LinkedList<TransferSession> transferSessions = new 
LinkedList<TransferSession>();
+       private final HashMap<String, PBFileInfo> bulkFiles = new 
HashMap<String, PBFileInfo>();        
+       private PBSlaveInitResponse initResponse;
+       private boolean online;
+       private final AtomicBoolean started = new AtomicBoolean();
+       
+       // Used to do real time journal updates..
+       int journalUpdateFileId;
+       RandomAccessFile journalUpateFile;
+       private String master;
+       
        public ReplicationSlave(ReplicationServer replicationServer) {
                this.replicationServer = replicationServer;
+               master = replicationServer.getClusterState().getMaster();
        }
 
        public void start() throws Exception {
-               transport = TransportFactory.connect(new 
URI(replicationServer.getClusterState().getMaster()));
-               transport.setTransportListener(this);
-               transport.start();
-
-               // Make sure the replication directory exists.
-               replicationServer.getTempReplicationDir().mkdirs();
-               
-               ReplicationFrame frame = new ReplicationFrame();
-               frame.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
-               PBSlaveInit payload = new PBSlaveInit();
-               payload.setNodeId(replicationServer.getNodeId());
-               
-               // This call back is executed once the checkpoint is
-               // completed and all data has been
-               // synced to disk, but while a lock is still held on the
-               // store so that no
-               // updates are allowed.
+               if( started.compareAndSet(false, true)) {
+                       doStart();
+               }
+       }
+       
+       public void stop() throws Exception {
+               if( started.compareAndSet(true, false)) {
+                       doStop();
+               }
+       }
 
-               HashMap<String, PBFileInfo> infosMap = new HashMap<String, 
PBFileInfo>();
-               
-               // Add all the files that were being transfered..
-               File tempReplicationDir = 
replicationServer.getTempReplicationDir();
-               File[] list = tempReplicationDir.listFiles();
-               if( list!=null ) {
-                       for (File file : list) {
-                               String name = file.getName();
-                               if( name.startsWith("database-") ) {
-                                       int snapshot;
-                                       try {
-                                               snapshot = 
Integer.parseInt(name.substring("database-".length()));
-                                       } catch (NumberFormatException e) {
-                                               continue;
+       private void doStart() throws Exception, URISyntaxException, 
IOException {
+               synchronized (transferMutex) {
+                       
+                       // Failure recovery might be trying to start us back up,
+                       // but the Replication server may have already stopped 
us so there is not need to start up.
+                       if( !started.get() ) {
+                               return;
+                       }
+                       
+                       replicationServer.getStore().open();
+                       
+                       transport = TransportFactory.connect(new URI(master));
+                       transport.setTransportListener(this);
+                       transport.start();
+       
+                       // Make sure the replication directory exists.
+                       replicationServer.getTempReplicationDir().mkdirs();
+                       
+                       ReplicationFrame frame = new ReplicationFrame();
+                       frame.setHeader(new 
PBHeader().setType(PBType.SLAVE_INIT));
+                       PBSlaveInit payload = new PBSlaveInit();
+                       payload.setNodeId(replicationServer.getNodeId());
+                       
+                       // This call back is executed once the checkpoint is
+                       // completed and all data has been
+                       // synced to disk, but while a lock is still held on the
+                       // store so that no
+                       // updates are allowed.
+       
+                       HashMap<String, PBFileInfo> infosMap = new 
HashMap<String, PBFileInfo>();
+                       
+                       // Add all the files that were being transfered..
+                       File tempReplicationDir = 
replicationServer.getTempReplicationDir();
+                       File[] list = tempReplicationDir.listFiles();
+                       if( list!=null ) {
+                               for (File file : list) {
+                                       String name = file.getName();
+                                       if( name.startsWith("database-") ) {
+                                               int snapshot;
+                                               try {
+                                                       snapshot = 
Integer.parseInt(name.substring("database-".length()));
+                                               } catch (NumberFormatException 
e) {
+                                                       continue;
+                                               }
+                                               
+                                               PBFileInfo info = 
replicationServer.createInfo("database", file, 0, file.length());
+                                               info.setSnapshotId(snapshot);
+                                               infosMap.put("database", info);
+                                       } else if( name.startsWith("journal-") 
) {
+                                               PBFileInfo info = 
replicationServer.createInfo(name, file, 0, file.length());
+                                               infosMap.put(name, info);
                                        }
-                                       
-                                       PBFileInfo info = 
replicationServer.createInfo("database", file, 0, file.length());
-                                       info.setSnapshotId(snapshot);
-                                       infosMap.put("database", info);
-                               } else if( name.startsWith("journal-") ) {
-                                       PBFileInfo info = 
replicationServer.createInfo(name, file, 0, file.length());
-                                       infosMap.put(name, info);
                                }
                        }
-               }
-               
-               // Add all the db files that were not getting transfered..
-               KahaDBStore store = replicationServer.getStore();
-               Map<Integer, DataFile> journalFiles = 
store.getJournal().getFileMap();
-               for (DataFile df : journalFiles.values()) {
-                       String name = "journal-" + df.getDataFileId();
-                       // Did we have a transfer in progress for that file 
already?
-                       if( infosMap.containsKey(name) ) {
-                               continue;
-                       }
-                       infosMap.put(name, replicationServer.createInfo(name, 
df.getFile(), 0, df.getLength()));
-               }
-               if( !infosMap.containsKey("database") ) {
-                       File pageFile = store.getPageFile().getFile();
-                       if( pageFile.exists() ) {
-                               infosMap.put("database", 
replicationServer.createInfo("database", pageFile, 0, pageFile.length()));
+                       
+                       // Add all the db files that were not getting 
transfered..
+                       KahaDBStore store = replicationServer.getStore();
+                       Map<Integer, DataFile> journalFiles = 
store.getJournal().getFileMap();
+                       for (DataFile df : journalFiles.values()) {
+                               String name = "journal-" + df.getDataFileId();
+                               // Did we have a transfer in progress for that 
file already?
+                               if( infosMap.containsKey(name) ) {
+                                       continue;
+                               }
+                               infosMap.put(name, 
replicationServer.createInfo(name, df.getFile(), 0, df.getLength()));
+                       }
+                       if( !infosMap.containsKey("database") ) {
+                               File pageFile = store.getPageFile().getFile();
+                               if( pageFile.exists() ) {
+                                       infosMap.put("database", 
replicationServer.createInfo("database", pageFile, 0, pageFile.length()));
+                               }
                        }
+                       
+                       ArrayList<PBFileInfo> infos = new 
ArrayList<PBFileInfo>(infosMap.size());
+                       for (PBFileInfo info : infosMap.values()) {
+                               infos.add(info);
+                       }
+                       payload.setCurrentFilesList(infos);
+                       
+                       frame.setPayload(payload);
+                       LOG.info("Sending master slave init command: " + 
payload);
+                       online = false;
+                       transport.oneway(frame);
                }
-               
-               ArrayList<PBFileInfo> infos = new 
ArrayList<PBFileInfo>(infosMap.size());
-               for (PBFileInfo info : infosMap.values()) {
-                       infos.add(info);
-               }
-               payload.setCurrentFilesList(infos);
-               
-               frame.setPayload(payload);
-               LOG.info("Sending master slave init command: " + payload);
-               bulkSynchronizing = true;
-               transport.oneway(frame);
-
        }
 
-       public void stop() throws Exception {
+       private void doStop() throws Exception, IOException {
+               synchronized (transferMutex) {
+                       if( this.transport!=null ) {
+                               this.transport.stop();
+                               this.transport=null;
+                       }
+       
+                       // Stop any current transfer sessions.
+                       for (TransferSession session : this.transferSessions) {
+                               session.stop();
+                       }
+       
+                       this.transferQueue.clear();
+                       
+                       this.initResponse=null;
+                       this.bulkFiles.clear(); 
+                       this.online=false;
+       
+                       if( journalUpateFile !=null ) {
+                               journalUpateFile.close();
+                               journalUpateFile=null;
+                       }
+                       journalUpdateFileId=0;
+                       
+                       replicationServer.getStore().close();
+               }
        }
 
        public void onClusterChange(ClusterState config) {
+               synchronized (transferMutex) {
+                       // When the master changes.. we need to re-sync with 
the new master.
+                       if( !master.equals(config.getMaster()) ) {
+                               try {
+                                       doStop();
+                                       master = config.getMaster();
+                                       doStart();
+                               } catch (Exception e) {
+                                       LOG.error("Could not restart syncing 
with new master: "+config.getMaster()+", due to: "+e,e);
+                               }
+                       }
+               }
        }
 
        public void onCommand(Object command) {
@@ -164,29 +239,23 @@
                failed(error);
        }
 
-       public void failed(Exception error) {
+       public void failed(Throwable error) {
                try {
-                       LOG.warn("Replication session fail to master: 
"+transport.getRemoteAddress(), error);
-                       stop();
+                       if( started.get() ) {
+                               LOG.warn("Replication session fail to master: 
"+transport.getRemoteAddress(), error);
+                               doStop();
+                               // Wait a little an try to establish the 
session again..
+                               Thread.sleep(1000);
+                               doStart();
+                       }
                } catch (Exception ignore) {
                }
        }
 
        public void transportInterupted() {
        }
-
        public void transportResumed() {
        }
-
-       private Object transferMutex = new Object();
-       private LinkedList<PBFileInfo> transferQueue = new 
LinkedList<PBFileInfo>();
-       private boolean bulkSynchronizing;
-       private PBSlaveInitResponse initResponse;
-
-       int journalUpdateFileId;
-       RandomAccessFile journalUpateFile;
-       
-       HashMap<String, PBFileInfo> bulkFiles = new HashMap<String, 
PBFileInfo>();
        
        private void onJournalUpdate(ReplicationFrame frame, PBJournalUpdate 
update) throws IOException {
                boolean onlineRecovery=false;
@@ -199,7 +268,7 @@
                                }
                                File file;
                                String name = "journal-"+location.getFileId();
-                               if( bulkSynchronizing ) {
+                               if( !online ) {
                                        file = 
replicationServer.getTempReplicationFile(name, 0);
                                        if( !bulkFiles.containsKey(name) ) {
                                                bulkFiles.put(name, new 
PBFileInfo().setName(name));
@@ -211,10 +280,12 @@
                                }
                                journalUpateFile = new RandomAccessFile(file, 
"rw");
                                journalUpdateFileId = location.getFileId();
-                       }                       
+                       }
+                       
+//                     System.out.println("Writing: 
"+location.getFileId()+":"+location.getOffset()+" with "+data.length);
                        journalUpateFile.seek(location.getOffset());
                        journalUpateFile.write(data);
-                       if( !bulkSynchronizing ) {
+                       if( online ) {
                                onlineRecovery=true;
                        }
                }
@@ -235,45 +306,52 @@
                return rc;
        }
        
-       private void commitBulkTransfer() throws IOException {
-               synchronized (transferMutex) {
+       private void commitBulkTransfer() {
+               try {
                        
-                       LOG.info("Slave synhcronization complete, going 
online...");
+                       synchronized (transferMutex) {
+                               
+                               LOG.info("Slave synhcronization complete, going 
online...");
 
-                       if( journalUpateFile!=null ) {
-                               journalUpateFile.close();
-                               journalUpateFile=null;
-                       }
-                       replicationServer.getStore().close();
-                       
-                       // If we got a new snapshot of the database, then we 
need to 
-                       // delete it's assisting files too.
-                       if( bulkFiles.containsKey("database") ) {
-                               PageFile pageFile = 
replicationServer.getStore().getPageFile();
-                               pageFile.getRecoveryFile().delete();
-                               pageFile.getFreeFile().delete();
-                       }
-                       
-                       for (PBFileInfo info : bulkFiles.values()) {
-                               File from = 
replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
-                               File to = 
replicationServer.getReplicationFile(info.getName());
-                               move(from, to);
+                               replicationServer.getStore().close();
+                               
+                               if( journalUpateFile!=null ) {
+                                       journalUpateFile.close();
+                                       journalUpateFile=null;
+                               }
+                               
+                               // If we got a new snapshot of the database, 
then we need to 
+                               // delete it's assisting files too.
+                               if( bulkFiles.containsKey("database") ) {
+                                       PageFile pageFile = 
replicationServer.getStore().getPageFile();
+                                       pageFile.getRecoveryFile().delete();
+                                       pageFile.getFreeFile().delete();
+                               }
+                               
+                               for (PBFileInfo info : bulkFiles.values()) {
+                                       File from = 
replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
+                                       File to = 
replicationServer.getReplicationFile(info.getName());
+                                       to.getParentFile().mkdirs();
+                                       move(from, to);
+                               }
+                               
+                               delete(initResponse.getDeleteFilesList());
+                               online=true;
+                               
+                               replicationServer.getStore().open();
+                               
+                               LOG.info("Slave is now online.  We are now 
eligible to become the master.");
                        }
                        
-                       delete(initResponse.getDeleteFilesList());
-                       bulkSynchronizing=false;
+                       // Let the master know we are now online.
+                       ReplicationFrame frame = new ReplicationFrame();
+                       frame.setHeader(new 
PBHeader().setType(PBType.SLAVE_ONLINE));
+                       transport.oneway(frame);
                        
-                       replicationServer.getStore().open();
-                       
-                       LOG.info("Slave is now online.  We are now eligible to 
become the master.");
+               } catch (Throwable e) {
+                       e.printStackTrace();
+                       failed(e);
                }
-               
-               // Let the master know we are now online.
-               ReplicationFrame frame = new ReplicationFrame();
-               frame.setHeader(new PBHeader().setType(PBType.SLAVE_ONLINE));
-               transport.oneway(frame);
-               
-               replicationServer.getStore().incrementalRecover();
        }
 
        private void onSlaveInitResponse(ReplicationFrame frame, 
PBSlaveInitResponse response) throws Exception {
@@ -320,11 +398,9 @@
                }
        }
 
-       LinkedList<TransferSession> transferSessions = new 
LinkedList<TransferSession>();
-
-       private void addTransferSession() throws Exception {
+       private void addTransferSession() {
                synchronized (transferMutex) {
-                       while (!transferQueue.isEmpty() && 
transferSessions.size() < MAX_TRANSFER_SESSIONS) {
+                       while (transport!=null && !transferQueue.isEmpty() && 
transferSessions.size() < MAX_TRANSFER_SESSIONS) {
                                TransferSession transferSession = new 
TransferSession();
                                transferSessions.add(transferSession);
                                try {
@@ -357,11 +433,11 @@
                        } finally {
                                try {
                                        is.close();
-                               } finally {
+                               } catch(Throwable e) {
                                }
                                try {
                                        os.close();
-                               } finally {
+                               } catch(Throwable e) {
                                }
                        }
                        from.delete();
@@ -418,22 +494,11 @@
                                        }
                                        info = null;
                                }
-                               Thread stopThread = new Thread("Transfer 
Session Shutdown: " + transport.getRemoteAddress()) {
-                                       @Override
-                                       public void run() {
-                                               try {
-                                                       transport.stop();
-                                                       synchronized 
(transferMutex) {
-                                                               
transferSessions.remove(TransferSession.this);
-                                                               
addTransferSession();
-                                                       }
-                                               } catch (Exception e) {
-                                                       e.printStackTrace();
-                                               }
-                                       }
-                               };
-                               stopThread.setDaemon(true);
-                               stopThread.start();
+                               transport.stop();
+                               synchronized (transferMutex) {
+                                       
transferSessions.remove(TransferSession.this);
+                                       addTransferSession();
+                               }
                        }
                }
 

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=713419&r1=713418&r2=713419&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
 Wed Nov 12 08:31:48 2008
@@ -149,16 +149,16 @@
     protected boolean syncWrites=true;
     int checkpointInterval = 5*1000;
     int cleanupInterval = 30*1000;
-    boolean opened;
     
     protected AtomicBoolean started = new AtomicBoolean();
+    protected AtomicBoolean opened = new AtomicBoolean();
 
     public MessageDatabase() {
     }
 
     public void start() throws Exception {
         if (started.compareAndSet(false, true)) {
-            load();
+               load();
         }
     }
 
@@ -211,7 +211,7 @@
        }
        
        public void open() throws IOException {
-               if( !opened ) {
+               if( opened.compareAndSet(false, true) ) {
                getJournal();
                if (failIfJournalIsLocked) {
                    journal.lock();
@@ -232,74 +232,75 @@
                getPageFile();
                journal.start();
                loadPageFile();
-               opened=true;
+               
+               checkpointThread = new Thread("ActiveMQ Journal Checkpoint 
Worker") {
+                   public void run() {
+                       try {
+                           long lastCleanup = System.currentTimeMillis();
+                           long lastCheckpoint = System.currentTimeMillis();
+                           
+                           // Sleep for a short time so we can periodically 
check 
+                           // to see if we need to exit this thread.
+                           long sleepTime = Math.min(checkpointInterval, 500);
+                           while (opened.get()) {
+                               Thread.sleep(sleepTime);
+                               long now = System.currentTimeMillis();
+                               if( now - lastCleanup >= cleanupInterval ) {
+                                   checkpointCleanup(true);
+                                   lastCleanup = now;
+                                   lastCheckpoint = now;
+                               } else if( now - lastCheckpoint >= 
checkpointInterval ) {
+                                   checkpointCleanup(false);
+                                   lastCheckpoint = now;
+                               }
+                           }
+                       } catch (InterruptedException e) {
+                           // Looks like someone really wants us to exit this 
thread...
+                       }
+                   }
+               };
+               checkpointThread.start();
+            recover();
                }
        }
        
     public void load() throws IOException {
        
-       open();
-        if (deleteAllMessages) {
-            journal.delete();
-
-            pageFile.unload();
-            pageFile.delete();
-            metadata = new Metadata();
-            
-            LOG.info("Persistence store purged.");
-            deleteAllMessages = false;
-            
-            loadPageFile();
-        }
-       
         synchronized (indexMutex) {
-            recover();
+               open();
+               
+               if (deleteAllMessages) {
+                   journal.delete();
+       
+                   pageFile.unload();
+                   pageFile.delete();
+                   metadata = new Metadata();
+                   
+                   LOG.info("Persistence store purged.");
+                   deleteAllMessages = false;
+                   
+                   loadPageFile();
+               }
+               store(new KahaTraceCommand().setMessage("LOADED " + new 
Date()));
+
         }
 
-        checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
-            public void run() {
-                try {
-                    long lastCleanup = System.currentTimeMillis();
-                    long lastCheckpoint = System.currentTimeMillis();
-                    
-                    // Sleep for a short time so we can periodically check 
-                    // to see if we need to exit this thread.
-                    long sleepTime = Math.min(checkpointInterval, 500);
-                    while (started.get()) {
-                        Thread.sleep(sleepTime);
-                        long now = System.currentTimeMillis();
-                        if( now - lastCleanup >= cleanupInterval ) {
-                            checkpointCleanup(true);
-                            lastCleanup = now;
-                            lastCheckpoint = now;
-                        } else if( now - lastCheckpoint >= checkpointInterval 
) {
-                            checkpointCleanup(false);
-                            lastCheckpoint = now;
-                        }
-                    }
-                } catch (InterruptedException e) {
-                    // Looks like someone really wants us to exit this 
thread...
-                }
-            }
-        };
-        checkpointThread.start();
     }
 
     
-       public void close() throws IOException {
-        synchronized (indexMutex) {
-            pageFile.unload();
-            metadata = new Metadata();
-        }
-        journal.close();
-        opened=false;
+       public void close() throws IOException, InterruptedException {
+               if( opened.compareAndSet(true, false)) {
+               synchronized (indexMutex) {
+                   pageFile.unload();
+                   metadata = new Metadata();
+               }
+               journal.close();
+               checkpointThread.join();
+               }
        }
        
     public void unload() throws IOException, InterruptedException {
-        checkpointThread.join();
-
         synchronized (indexMutex) {
-            
             metadata.state = CLOSED_STATE;
             metadata.firstInProgressTransactionLocation = 
getFirstInProgressTxLocation();
 
@@ -308,13 +309,8 @@
                     tx.store(metadata.page, metadataMarshaller, true);
                 }
             });
-
-            pageFile.unload();
-            metadata = new Metadata();
+            close();
         }
-        store(new KahaTraceCommand().setMessage("CLEAN SHUTDOWN " + new 
Date()));
-        journal.close();
-        opened=false;
     }
 
     /**
@@ -356,6 +352,7 @@
 
         while (recoveryPosition != null) {
             JournalCommand message = load(recoveryPosition);
+            metadata.lastUpdate = recoveryPosition;
             process(message, recoveryPosition);
             redoCounter++;
             recoveryPosition = journal.getNextLocation(recoveryPosition);
@@ -377,6 +374,7 @@
         }
         while (nextRecoveryPosition != null) {
                lastRecoveryPosition = nextRecoveryPosition;
+            metadata.lastUpdate = lastRecoveryPosition;
             JournalCommand message = load(lastRecoveryPosition);
             process(message, lastRecoveryPosition);            
             nextRecoveryPosition = 
journal.getNextLocation(lastRecoveryPosition);
@@ -403,15 +401,17 @@
     protected void checkpointCleanup(final boolean cleanup) {
         try {
             synchronized (indexMutex) {
+               if( !opened.get() ) {
+                       return;
+               }
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
                         checkpointUpdate(tx, cleanup);
                     }
                 });
-                pageFile.flush();
             }
-            store(new KahaTraceCommand().setMessage("CHECKPOINT " + new 
Date()), true);
         } catch (IOException e) {
+               e.printStackTrace();
         }
     }
 
@@ -426,7 +426,6 @@
             pageFile.flush();
             closure.execute();
         }
-        store(new KahaTraceCommand().setMessage("CHECKPOINT " + new Date()), 
true);
        }
 
     // /////////////////////////////////////////////////////////////////


Reply via email to