Repository: activemq Updated Branches: refs/heads/master e093a8c1d -> bf8eb08ac
AMQ-7086 - make kahadb gc/cleanup on shutdown optional to trade availability over disk usage for fast failover Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bf8eb08a Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bf8eb08a Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bf8eb08a Branch: refs/heads/master Commit: bf8eb08acaeec653d04daa0b8b6dd889ef990bed Parents: e093a8c Author: gtully <gary.tu...@gmail.com> Authored: Wed Oct 31 14:29:05 2018 +0000 Committer: gtully <gary.tu...@gmail.com> Committed: Wed Oct 31 14:29:05 2018 +0000 ---------------------------------------------------------------------- .../activemq/store/PersistenceAdapter.java | 4 +- .../store/kahadb/AbstractKahaDBStore.java | 11 +- .../store/kahadb/KahaDBPersistenceAdapter.java | 17 +- .../activemq/store/kahadb/MessageDatabase.java | 16 +- .../kahadb/MultiKahaDBPersistenceAdapter.java | 4 +- .../kahadb/scheduler/JobSchedulerStoreImpl.java | 6 +- .../store/kahadb/scheduler/AMQ7086Test.java | 184 +++++++++++++++++++ 7 files changed, 228 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java index 07063b4..7bad926 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java @@ -183,11 +183,11 @@ public interface PersistenceAdapter extends Service { /** * checkpoint any * - * @param sync + * @param cleanup * @throws IOException * */ - void checkpoint(boolean sync) throws IOException; + void checkpoint(boolean cleanup) throws IOException; /** * A hint to return the size of the store on disk http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java index 6003c87..70be2fb 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java @@ -55,6 +55,7 @@ public abstract class AbstractKahaDBStore extends LockableServiceSupport { protected boolean failIfDatabaseIsLocked; protected long checkpointInterval = 5*1000; protected long cleanupInterval = 30*1000; + private boolean cleanupOnStop = true; protected boolean checkForCorruptJournalFiles = false; protected boolean checksumJournalFiles = true; protected boolean forceRecoverIndex = false; @@ -202,6 +203,14 @@ public abstract class AbstractKahaDBStore extends LockableServiceSupport { this.cleanupInterval = cleanupInterval; } + public void setCleanupOnStop(boolean cleanupOnStop) { + this.cleanupOnStop = cleanupOnStop; + } + + public boolean getCleanupOnStop() { + return this.cleanupOnStop; + } + public boolean isChecksumJournalFiles() { return checksumJournalFiles; } @@ -666,7 +675,7 @@ public abstract class AbstractKahaDBStore extends LockableServiceSupport { */ protected void startCheckpoint() { if (checkpointInterval == 0 && cleanupInterval == 0) { - LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart"); + LOG.info("periodic checkpoint/cleanup disabled, will occur on clean " + (getCleanupOnStop() ? "shutdown/" : "") + "restart"); return; } synchronized (checkpointThreadLock) { http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index fbeda4c..5d6e896 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -79,13 +79,13 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements } /** - * @param sync + * @param cleanup * @throws IOException * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean) */ @Override - public void checkpoint(boolean sync) throws IOException { - this.letter.checkpoint(sync); + public void checkpoint(boolean cleanup) throws IOException { + this.letter.checkpoint(cleanup); } /** @@ -817,4 +817,15 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements public boolean isPersistNoLocal() { return this.letter.isPersistNoLocal(); } + + /* + * When set, ensure that the cleanup/gc operation is executed during the stop procedure + */ + public void setCleanupOnStop(boolean cleanupOnStop) { + this.letter.setCleanupOnStop(cleanupOnStop); + } + + public boolean getCleanupOnStop() { + return this.letter.getCleanupOnStop(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 22a2c1e..db6239a 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -266,6 +266,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe long journalDiskSyncInterval = 1000; long checkpointInterval = 5*1000; long cleanupInterval = 30*1000; + boolean cleanupOnStop = true; int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; boolean enableIndexWriteAsync = false; @@ -375,7 +376,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe private void startCheckpoint() { if (checkpointInterval == 0 && cleanupInterval == 0) { - LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart"); + LOG.info("periodic checkpoint/cleanup disabled, will occur on clean " + (getCleanupOnStop() ? "shutdown/" : "") + "restart"); return; } synchronized (schedulerLock) { @@ -508,7 +509,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe checkpointLock.writeLock().lock(); try { if (metadata.page != null) { - checkpointUpdate(true); + checkpointUpdate(getCleanupOnStop()); } pageFile.unload(); metadata = createMetadata(); @@ -1147,9 +1148,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe after.run(); } - if (scheduler == null && opened.get()) { - startCheckpoint(); - } return location; } catch (IOException ioe) { LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(), ioe); @@ -3311,6 +3309,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe this.cleanupInterval = cleanupInterval; } + public boolean getCleanupOnStop() { + return cleanupOnStop; + } + + public void setCleanupOnStop(boolean cleanupOnStop) { + this.cleanupOnStop = cleanupOnStop; + } + public void setJournalMaxFileLength(int journalMaxFileLength) { this.journalMaxFileLength = journalMaxFileLength; } http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java index 4bdb8de..f143a07 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java @@ -173,9 +173,9 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem } @Override - public void checkpoint(final boolean sync) throws IOException { + public void checkpoint(final boolean cleanup) throws IOException { for (PersistenceAdapter persistenceAdapter : adapters) { - persistenceAdapter.checkpoint(sync); + persistenceAdapter.checkpoint(cleanup); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java index 05ca383..79059f1 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java @@ -59,6 +59,10 @@ import org.apache.activemq.util.IOHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/* + * @org.apache.xbean.XBean element="kahaDBJobScheduler" + */ + public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSchedulerStore { private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class); @@ -230,7 +234,7 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch checkpointLock.writeLock().lock(); try { if (metaData.getPage() != null) { - checkpointUpdate(true); + checkpointUpdate(getCleanupOnStop()); } } finally { checkpointLock.writeLock().unlock(); http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java new file mode 100644 index 0000000..3028dd0 --- /dev/null +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb.scheduler; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.disk.journal.Journal; +import org.apache.activemq.util.IOHelper; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; +import java.io.File; +import java.io.FilenameFilter; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class AMQ7086Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ7086Test.class); + + BrokerService brokerService; + JobSchedulerStoreImpl jobSchedulerStore; + KahaDBPersistenceAdapter kahaDBPersistenceAdapter; + + @Test + public void testGcDoneAtStop() throws Exception { + + brokerService = createBroker(true); + brokerService.start(); + + produceWithScheduledDelayAndConsume(); + + LOG.info("job store: " + jobSchedulerStore); + int numSchedulerFiles = jobSchedulerStore.getJournal().getFileMap().size(); + LOG.info("kahadb store: " + kahaDBPersistenceAdapter); + int numKahadbFiles = kahaDBPersistenceAdapter.getStore().getJournal().getFileMap().size(); + + LOG.info("Num files, job store: {}, messge store: {}", numKahadbFiles, numKahadbFiles); + + // pull the dirs before we stop + File jobDir = jobSchedulerStore.getJournal().getDirectory(); + File kahaDir = kahaDBPersistenceAdapter.getStore().getJournal().getDirectory(); + + brokerService.stop(); + + + assertEquals("Expected job store data files", 1, verifyFilesOnDisk(jobDir)); + assertEquals("Expected kahadb data files", 1, verifyFilesOnDisk(kahaDir)); + } + + @Test + public void testNoGcAtStop() throws Exception { + + brokerService = createBroker(false); + brokerService.start(); + + produceWithScheduledDelayAndConsume(); + + LOG.info("job store: " + jobSchedulerStore); + int numSchedulerFiles = jobSchedulerStore.getJournal().getFileMap().size(); + LOG.info("kahadb store: " + kahaDBPersistenceAdapter); + int numKahadbFiles = kahaDBPersistenceAdapter.getStore().getJournal().getFileMap().size(); + + LOG.info("Num files, job store: {}, messge store: {}", numKahadbFiles, numKahadbFiles); + + // pull the dirs before we stop + File jobDir = jobSchedulerStore.getJournal().getDirectory(); + File kahaDir = kahaDBPersistenceAdapter.getStore().getJournal().getDirectory(); + + brokerService.stop(); + + assertEquals("Expected job store data files", numSchedulerFiles, verifyFilesOnDisk(jobDir)); + assertEquals("Expected kahadb data files", numKahadbFiles, verifyFilesOnDisk(kahaDir)); + } + + private int verifyFilesOnDisk(File directory) { + + LOG.info("Broker: " + brokerService); + LOG.info("dir: " + directory); + int result = 0; + + File[] files = directory.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String n) { + return dir.equals(directory) && n.startsWith(Journal.DEFAULT_FILE_PREFIX) && n.endsWith(Journal.DEFAULT_FILE_SUFFIX); + } + }); + + LOG.info("File count: " + (files != null ? files.length : " empty!")); + + if (files != null) { + result = files.length; + } + for (File file : files) { + LOG.info("name :" + file.getAbsolutePath()); + } + return result; + } + + protected BrokerService createBroker(boolean doCleanupOnStop) throws Exception { + File schedulerDirectory = new File("target/scheduler"); + File kahadbDir = new File("target/kahadb"); + + for (File directory: new File[]{schedulerDirectory, kahadbDir}) { + IOHelper.mkdirs(directory); + IOHelper.deleteChildren(directory); + } + + BrokerService broker = new BrokerService(); + broker.setUseJmx(false); + broker.setSchedulerSupport(true); + + + jobSchedulerStore = new JobSchedulerStoreImpl(); + jobSchedulerStore.setDirectory(schedulerDirectory); + jobSchedulerStore.setJournalMaxFileLength(16*1024); + + jobSchedulerStore.setCheckpointInterval(0); + jobSchedulerStore.setCleanupOnStop(doCleanupOnStop); + + broker.setJobSchedulerStore(jobSchedulerStore); + + + kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter(); + kahaDBPersistenceAdapter.setDirectory(kahadbDir); + kahaDBPersistenceAdapter.setJournalMaxFileLength(16*1024); + + kahaDBPersistenceAdapter.setCleanupInterval(0); + kahaDBPersistenceAdapter.setCleanupOnStop(doCleanupOnStop); + + broker.setPersistenceAdapter(kahaDBPersistenceAdapter); + + return broker; + } + + public void produceWithScheduledDelayAndConsume() throws Exception { + Connection connection = new ActiveMQConnectionFactory("vm://localhost").createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + connection.start(); + final ActiveMQQueue destination = new ActiveMQQueue("QQ"); + final int numMessages = 50; + final long time = 1000l; + final byte[] payload = new byte[1024]; + MessageProducer producer = session.createProducer(destination); + for (int i = 0; i < numMessages; i++) { + BytesMessage bytesMessage = session.createBytesMessage(); + bytesMessage.writeBytes(payload); + bytesMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); + producer.send(bytesMessage); + } + + MessageConsumer messageConsumer = session.createConsumer(destination); + for (int i = 0; i < numMessages; i++) { + assertNotNull(messageConsumer.receive(5000)); + } + connection.close(); + + // let last ack settle + TimeUnit.SECONDS.sleep(1); + + } +}