http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java new file mode 100644 index 0000000..cf87cde --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.journal; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; +import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.tests.unit.core.journal.impl.SequentialFileFactoryTestBase; +import org.junit.Assert; +import org.junit.Test; + +public class MappedSequentialFileFactoryTest extends SequentialFileFactoryTestBase { + + @Override + protected SequentialFileFactory createFactory(String folder) { + return new MappedSequentialFileFactory(new File(folder)); + } + + @Test + public void testInterrupts() throws Throwable { + + final EncodingSupport fakeEncoding = new EncodingSupport() { + @Override + public int getEncodeSize() { + return 10; + } + + @Override + public void encode(ActiveMQBuffer buffer) { + buffer.writeBytes(new byte[10]); + } + + @Override + public void decode(ActiveMQBuffer buffer) { + + } + }; + + final AtomicInteger calls = new AtomicInteger(0); + final MappedSequentialFileFactory factory = new MappedSequentialFileFactory(new File(getTestDir()), (code, message, file) -> { + new Exception("shutdown").printStackTrace(); + calls.incrementAndGet(); + }); + + Thread threadOpen = new Thread() { + @Override + public void run() { + try { + Thread.currentThread().interrupt(); + SequentialFile file = factory.createSequentialFile("file.txt"); + file.open(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + threadOpen.start(); + threadOpen.join(); + + Thread threadClose = new Thread() { + @Override + public void run() { + try { + SequentialFile file = factory.createSequentialFile("file.txt"); + file.open(); + file.write(fakeEncoding, true); + Thread.currentThread().interrupt(); + file.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + threadClose.start(); + threadClose.join(); + + Thread threadWrite = new Thread() { + @Override + public void run() { + try { + SequentialFile file = factory.createSequentialFile("file.txt"); + file.open(); + Thread.currentThread().interrupt(); + file.write(fakeEncoding, true); + file.close(); + + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + threadWrite.start(); + threadWrite.join(); + + Thread threadFill = new Thread() { + @Override + public void run() { + try { + SequentialFile file = factory.createSequentialFile("file.txt"); + file.open(); + Thread.currentThread().interrupt(); + file.fill(1024); + file.close(); + + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + threadFill.start(); + threadFill.join(); + + Thread threadWriteDirect = new Thread() { + @Override + public void run() { + try { + SequentialFile file = factory.createSequentialFile("file.txt"); + file.open(); + ByteBuffer buffer = ByteBuffer.allocate(10); + buffer.put(new byte[10]); + Thread.currentThread().interrupt(); + file.writeDirect(buffer, true); + file.close(); + + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + threadWriteDirect.start(); + threadWriteDirect.join(); + + Thread threadRead = new Thread() { + @Override + public void run() { + try { + SequentialFile file = factory.createSequentialFile("file.txt"); + file.open(); + file.write(fakeEncoding, true); + file.position(0); + ByteBuffer readBytes = ByteBuffer.allocate(fakeEncoding.getEncodeSize()); + Thread.currentThread().interrupt(); + file.read(readBytes); + file.close(); + + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + threadRead.start(); + threadRead.join(); + + // An interrupt exception shouldn't issue a shutdown + Assert.assertEquals(0, calls.get()); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java index 8f15c48..d2ffd6f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java @@ -22,8 +22,11 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.ArtemisConstants; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; @@ -102,6 +105,27 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase { internalTest("nio2", getTestDir(), 10000, 0, true, true, 1); } + @Test + public void testMMap() throws Exception { + internalTest("mmap", getTestDir(), 10000, 100, true, true, 1); + } + + @Test + public void testMMAPHugeTransaction() throws Exception { + internalTest("mmap", getTestDir(), 10000, 10000, true, true, 1); + } + + @Test + public void testMMAPOMultiThread() throws Exception { + internalTest("mmap", getTestDir(), 1000, 100, true, true, 10); + } + + @Test + public void testMMAPNonTransactional() throws Exception { + internalTest("mmap", getTestDir(), 10000, 0, true, true, 1); + } + + // Package protected --------------------------------------------- private void internalTest(final String type, @@ -234,7 +258,7 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase { if (args.length != 5) { System.err.println("Use: java -cp <classpath> " + ValidateTransactionHealthTest.class.getCanonicalName() + - " aio|nio <journalDirectory> <NumberOfElements> <TransactionSize> <NumberOfThreads>"); + " aio|nio|mmap <journalDirectory> <NumberOfElements> <TransactionSize> <NumberOfThreads>"); System.exit(-1); } System.out.println("Running"); @@ -320,15 +344,22 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase { } public static JournalImpl createJournal(final String journalType, final String journalDir) { - JournalImpl journal = new JournalImpl(10485760, 2, 2, 0, 0, ValidateTransactionHealthTest.getFactory(journalType, journalDir), "journaltst", "tst", 500); + JournalImpl journal = new JournalImpl(10485760, 2, 2, 0, 0, ValidateTransactionHealthTest.getFactory(journalType, journalDir, 10485760), "journaltst", "tst", 500); return journal; } - public static SequentialFileFactory getFactory(final String factoryType, final String directory) { + public static SequentialFileFactory getFactory(final String factoryType, final String directory, int fileSize) { if (factoryType.equals("aio")) { return new AIOSequentialFileFactory(new File(directory), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 10, false); } else if (factoryType.equals("nio2")) { return new NIOSequentialFileFactory(new File(directory), true, 1); + } else if (factoryType.equals("mmap")) { + return new MappedSequentialFileFactory(new File(directory), new IOCriticalErrorListener() { + @Override + public void onIOException(Throwable code, String message, SequentialFile file) { + code.printStackTrace(); + } + }, true).chunkBytes(fileSize).overlapBytes(0); } else { return new NIOSequentialFileFactory(new File(directory), false, 1); }
