http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/TempKahaStoreQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/TempKahaStoreQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/TempKahaStoreQueueTest.java deleted file mode 100644 index 2575afd..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/TempKahaStoreQueueTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb.perf; - -import java.io.File; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.perf.SimpleQueueTest; -import org.apache.activemq.store.kahadb.TempKahaDBStore; - -/** - * - */ -public class TempKahaStoreQueueTest extends SimpleQueueTest { - - @Override - protected void configureBroker(BrokerService answer, String uri) throws Exception { - File dataFileDir = new File("target/test-amq-data/perfTest/temp-amqdb"); - dataFileDir.mkdirs(); - answer.setDeleteAllMessagesOnStartup(true); - - TempKahaDBStore adaptor = new TempKahaDBStore(); - adaptor.setDirectory(dataFileDir); - - answer.setDataDirectoryFile(dataFileDir); - answer.setPersistenceAdapter(adaptor); - answer.addConnector(uri); - } - -} -
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java deleted file mode 100644 index 6626603..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb.plist; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.*; -import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; -import org.apache.activemq.broker.region.cursors.FilePendingMessageCursorTestSupport; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.store.kahadb.disk.page.PageFile; -import org.apache.activemq.usage.SystemUsage; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - -/** - * @author <a href="http://hiramchirino.com">Hiram Chirino</a> - */ -public class KahaDBFilePendingMessageCursorTest extends FilePendingMessageCursorTestSupport { - - @Test - public void testAddRemoveAddIndexSize() throws Exception { - brokerService = new BrokerService(); - brokerService.setUseJmx(false); - SystemUsage usage = brokerService.getSystemUsage(); - usage.getMemoryUsage().setLimit(1024 * 150); - String body = new String(new byte[1024]); - Destination destination = new Queue(brokerService, new ActiveMQQueue("Q"), null, new DestinationStatistics(), null); - - underTest = new FilePendingMessageCursor(brokerService.getBroker(), "test", false); - underTest.setSystemUsage(usage); - - LOG.info("start"); - final PageFile pageFile = ((PListImpl) underTest.getDiskList()).getPageFile(); - LOG.info("page count: " + pageFile.getPageCount()); - LOG.info("free count: " + pageFile.getFreePageCount()); - LOG.info("content size: " + pageFile.getPageContentSize()); - - final long initialPageCount = pageFile.getPageCount(); - - final int numMessages = 1000; - - for (int j = 0; j < 10; j++) { - // ensure free pages are reused - for (int i = 0; i < numMessages; i++) { - ActiveMQMessage mqMessage = new ActiveMQMessage(); - mqMessage.setStringProperty("body", body); - mqMessage.setMessageId(new MessageId("1:2:3:" + i)); - mqMessage.setMemoryUsage(usage.getMemoryUsage()); - mqMessage.setRegionDestination(destination); - underTest.addMessageLast(new IndirectMessageReference(mqMessage)); - } - assertFalse("cursor is not full " + usage.getTempUsage(), underTest.isFull()); - - underTest.reset(); - long receivedCount = 0; - while (underTest.hasNext()) { - MessageReference ref = underTest.next(); - underTest.remove(); - ref.decrementReferenceCount(); - assertEquals("id is correct", receivedCount++, ref.getMessageId().getProducerSequenceId()); - } - assertEquals("got all messages back", receivedCount, numMessages); - LOG.info("page count: " + pageFile.getPageCount()); - LOG.info("free count: " + pageFile.getFreePageCount()); - LOG.info("content size: " + pageFile.getPageContentSize()); - } - - assertEquals("expected page usage", initialPageCount, pageFile.getPageCount() - pageFile.getFreePageCount()); - - LOG.info("Destroy"); - underTest.destroy(); - LOG.info("page count: " + pageFile.getPageCount()); - LOG.info("free count: " + pageFile.getFreePageCount()); - LOG.info("content size: " + pageFile.getPageContentSize()); - assertEquals("expected page usage", initialPageCount - 1, pageFile.getPageCount() - pageFile.getFreePageCount()); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java deleted file mode 100644 index 73adb52..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java +++ /dev/null @@ -1,669 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb.plist; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.apache.activemq.store.PList; -import org.apache.activemq.store.PListEntry; -import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.util.IOHelper; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PListTest { - - static final Logger LOG = LoggerFactory.getLogger(PListTest.class); - private PListStoreImpl store; - private PListImpl plist; - final ByteSequence payload = new ByteSequence(new byte[400]); - final String idSeed = new String("Seed" + Arrays.toString(new byte[1024])); - final Vector<Throwable> exceptions = new Vector<>(); - ExecutorService executor; - - private PListEntry getFirst(PList plist) throws IOException { - PList.PListIterator iterator = plist.iterator(); - try { - if (iterator.hasNext()) { - return iterator.next(); - } - else { - return null; - } - } - finally { - iterator.release(); - } - } - - @Test - public void testAddLast() throws Exception { - final int COUNT = 1000; - Map<String, ByteSequence> map = new LinkedHashMap<>(); - for (int i = 0; i < COUNT; i++) { - String test = new String("test" + i); - ByteSequence bs = new ByteSequence(test.getBytes()); - map.put(test, bs); - plist.addLast(test, bs); - } - assertEquals(plist.size(), COUNT); - int count = 0; - for (ByteSequence bs : map.values()) { - String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength()); - PListEntry entry = plist.get(count); - String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(), entry.getByteSequence().getLength()); - assertEquals(origStr, plistString); - count++; - } - } - - @Test - public void testAddFirst() throws Exception { - final int COUNT = 1000; - Map<String, ByteSequence> map = new LinkedHashMap<>(); - for (int i = 0; i < COUNT; i++) { - String test = new String("test" + i); - ByteSequence bs = new ByteSequence(test.getBytes()); - map.put(test, bs); - plist.addFirst(test, bs); - } - assertEquals(plist.size(), COUNT); - long count = plist.size() - 1; - for (ByteSequence bs : map.values()) { - String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength()); - PListEntry entry = plist.get(count); - String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(), entry.getByteSequence().getLength()); - assertEquals(origStr, plistString); - count--; - } - } - - @Test - public void testRemove() throws IOException { - doTestRemove(2000); - } - - protected void doTestRemove(final int COUNT) throws IOException { - Map<String, ByteSequence> map = new LinkedHashMap<>(); - for (int i = 0; i < COUNT; i++) { - String test = new String("test" + i); - ByteSequence bs = new ByteSequence(test.getBytes()); - map.put(test, bs); - plist.addLast(test, bs); - } - assertEquals(plist.size(), COUNT); - PListEntry entry = plist.getFirst(); - while (entry != null) { - plist.remove(entry.getId()); - entry = plist.getFirst(); - } - assertEquals(0, plist.size()); - } - - @Test - public void testDestroy() throws Exception { - doTestRemove(1); - plist.destroy(); - assertEquals(0, plist.size()); - } - - @Test - public void testDestroyNonEmpty() throws Exception { - final int COUNT = 1000; - Map<String, ByteSequence> map = new LinkedHashMap<>(); - for (int i = 0; i < COUNT; i++) { - String test = new String("test" + i); - ByteSequence bs = new ByteSequence(test.getBytes()); - map.put(test, bs); - plist.addLast(test, bs); - } - plist.destroy(); - assertEquals(0, plist.size()); - } - - @Test - public void testRemoveSecond() throws Exception { - plist.addLast("First", new ByteSequence("A".getBytes())); - plist.addLast("Second", new ByteSequence("B".getBytes())); - - assertTrue(plist.remove("Second")); - assertTrue(plist.remove("First")); - assertFalse(plist.remove("doesNotExist")); - } - - @Test - public void testRemoveSingleEntry() throws Exception { - plist.addLast("First", new ByteSequence("A".getBytes())); - - Iterator<PListEntry> iterator = plist.iterator(); - while (iterator.hasNext()) { - iterator.next(); - iterator.remove(); - } - } - - @Test - public void testRemoveSecondPosition() throws Exception { - plist.addLast("First", new ByteSequence("A".getBytes())); - plist.addLast("Second", new ByteSequence("B".getBytes())); - - assertTrue(plist.remove(1)); - assertTrue(plist.remove(0)); - assertFalse(plist.remove(0)); - } - - @Test - public void testConcurrentAddRemove() throws Exception { - File directory = store.getDirectory(); - store.stop(); - IOHelper.mkdirs(directory); - IOHelper.deleteChildren(directory); - store = new PListStoreImpl(); - store.setCleanupInterval(400); - store.setDirectory(directory); - store.setJournalMaxFileLength(1024 * 5); - store.setLazyInit(false); - store.start(); - - final ByteSequence payload = new ByteSequence(new byte[1024 * 2]); - - final Vector<Throwable> exceptions = new Vector<>(); - final int iterations = 1000; - final int numLists = 10; - - final PList[] lists = new PList[numLists]; - String threadName = Thread.currentThread().getName(); - for (int i = 0; i < numLists; i++) { - Thread.currentThread().setName("C:" + String.valueOf(i)); - lists[i] = store.getPList(String.valueOf(i)); - } - Thread.currentThread().setName(threadName); - - executor = Executors.newFixedThreadPool(100); - class A implements Runnable { - - @Override - public void run() { - final String threadName = Thread.currentThread().getName(); - try { - for (int i = 0; i < iterations; i++) { - PList candidate = lists[i % numLists]; - Thread.currentThread().setName("ALRF:" + candidate.getName()); - synchronized (plistLocks(candidate)) { - Object locator = candidate.addLast(String.valueOf(i), payload); - getFirst(candidate); - assertTrue(candidate.remove(locator)); - } - } - } - catch (Exception error) { - LOG.error("Unexpcted ex", error); - error.printStackTrace(); - exceptions.add(error); - } - finally { - Thread.currentThread().setName(threadName); - } - } - } - - class B implements Runnable { - - @Override - public void run() { - final String threadName = Thread.currentThread().getName(); - try { - for (int i = 0; i < iterations; i++) { - PList candidate = lists[i % numLists]; - Thread.currentThread().setName("ALRF:" + candidate.getName()); - synchronized (plistLocks(candidate)) { - Object locator = candidate.addLast(String.valueOf(i), payload); - getFirst(candidate); - assertTrue(candidate.remove(locator)); - } - } - } - catch (Exception error) { - error.printStackTrace(); - exceptions.add(error); - } - finally { - Thread.currentThread().setName(threadName); - } - } - } - - executor.execute(new A()); - executor.execute(new A()); - executor.execute(new A()); - executor.execute(new B()); - executor.execute(new B()); - executor.execute(new B()); - - executor.shutdown(); - boolean finishedInTime = executor.awaitTermination(30, TimeUnit.SECONDS); - - assertTrue("no exceptions", exceptions.isEmpty()); - assertTrue("finished ok", finishedInTime); - } - - @Test - public void testConcurrentAddLast() throws Exception { - File directory = store.getDirectory(); - store.stop(); - IOHelper.mkdirs(directory); - IOHelper.deleteChildren(directory); - store = new PListStoreImpl(); - store.setDirectory(directory); - store.start(); - - final int numThreads = 20; - final int iterations = 1000; - executor = Executors.newFixedThreadPool(100); - for (int i = 0; i < numThreads; i++) { - new Job(i, PListTest.TaskType.ADD, iterations).run(); - } - - for (int i = 0; i < numThreads; i++) { - executor.execute(new Job(i, PListTest.TaskType.ITERATE, iterations)); - } - - for (int i = 0; i < 100; i++) { - executor.execute(new Job(i + 20, PListTest.TaskType.ADD, 100)); - } - - executor.shutdown(); - boolean finishedInTime = executor.awaitTermination(60 * 5, TimeUnit.SECONDS); - assertTrue("finished ok", finishedInTime); - } - - @Test - public void testOverFlow() throws Exception { - File directory = store.getDirectory(); - store.stop(); - IOHelper.mkdirs(directory); - IOHelper.deleteChildren(directory); - store = new PListStoreImpl(); - store.setDirectory(directory); - store.start(); - - for (int i = 0; i < 2000; i++) { - new Job(i, PListTest.TaskType.ADD, 5).run(); - - } - LOG.info("After Load index file: " + store.pageFile.getFile().length()); - LOG.info("After remove index file: " + store.pageFile.getFile().length()); - } - - @Test - public void testConcurrentAddRemoveWithPreload() throws Exception { - File directory = store.getDirectory(); - store.stop(); - IOHelper.mkdirs(directory); - IOHelper.deleteChildren(directory); - store = new PListStoreImpl(); - store.setDirectory(directory); - store.setJournalMaxFileLength(1024 * 5); - store.setCleanupInterval(5000); - store.setIndexWriteBatchSize(500); - store.start(); - - final int iterations = 500; - final int numLists = 10; - - // prime the store - - // create/delete - LOG.info("create"); - for (int i = 0; i < numLists; i++) { - new Job(i, PListTest.TaskType.CREATE, iterations).run(); - } - - LOG.info("delete"); - for (int i = 0; i < numLists; i++) { - new Job(i, PListTest.TaskType.DELETE, iterations).run(); - } - - LOG.info("fill"); - for (int i = 0; i < numLists; i++) { - new Job(i, PListTest.TaskType.ADD, iterations).run(); - } - LOG.info("remove"); - for (int i = 0; i < numLists; i++) { - new Job(i, PListTest.TaskType.REMOVE, iterations).run(); - } - - LOG.info("check empty"); - for (int i = 0; i < numLists; i++) { - assertEquals("empty " + i, 0, store.getPList("List-" + i).size()); - } - - LOG.info("delete again"); - for (int i = 0; i < numLists; i++) { - new Job(i, PListTest.TaskType.DELETE, iterations).run(); - } - - LOG.info("fill again"); - for (int i = 0; i < numLists; i++) { - new Job(i, PListTest.TaskType.ADD, iterations).run(); - } - - LOG.info("parallel add and remove"); - executor = Executors.newFixedThreadPool(numLists * 2); - for (int i = 0; i < numLists * 2; i++) { - executor.execute(new Job(i, i >= numLists ? PListTest.TaskType.ADD : PListTest.TaskType.REMOVE, iterations)); - } - - executor.shutdown(); - LOG.info("wait for parallel work to complete"); - boolean finishedInTime = executor.awaitTermination(60 * 5, TimeUnit.SECONDS); - assertTrue("no exceptions", exceptions.isEmpty()); - assertTrue("finished ok", finishedInTime); - } - - // for non determinant issues, increasing this may help diagnose - final int numRepeats = 1; - - @Test - public void testRepeatStressWithCache() throws Exception { - for (int i = 0; i < numRepeats; i++) { - do_testConcurrentAddIterateRemove(true); - } - } - - @Test - public void testRepeatStressWithOutCache() throws Exception { - for (int i = 0; i < numRepeats; i++) { - do_testConcurrentAddIterateRemove(false); - } - } - - public void do_testConcurrentAddIterateRemove(boolean enablePageCache) throws Exception { - File directory = store.getDirectory(); - store.stop(); - IOHelper.mkdirs(directory); - IOHelper.deleteChildren(directory); - store = new PListStoreImpl(); - store.setIndexEnablePageCaching(enablePageCache); - store.setIndexPageSize(2 * 1024); - store.setDirectory(directory); - store.start(); - - final int iterations = 500; - final int numLists = 10; - - LOG.info("create"); - for (int i = 0; i < numLists; i++) { - new Job(i, PListTest.TaskType.CREATE, iterations).run(); - } - - LOG.info("fill"); - for (int i = 0; i < numLists; i++) { - new Job(i, PListTest.TaskType.ADD, iterations).run(); - } - - LOG.info("parallel add and remove"); - executor = Executors.newFixedThreadPool(400); - final int numProducer = 5; - final int numConsumer = 10; - for (int i = 0; i < numLists; i++) { - for (int j = 0; j < numProducer; j++) { - executor.execute(new Job(i, PListTest.TaskType.ADD, iterations * 2)); - } - for (int k = 0; k < numConsumer; k++) { - executor.execute(new Job(i, TaskType.ITERATE_REMOVE, iterations / 4)); - } - } - - for (int i = numLists; i < numLists * 10; i++) { - executor.execute(new Job(i, PListTest.TaskType.ADD, iterations)); - } - - executor.shutdown(); - LOG.info("wait for parallel work to complete"); - boolean shutdown = executor.awaitTermination(60 * 60, TimeUnit.SECONDS); - assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); - assertTrue("test did not timeout ", shutdown); - } - - @Test - public void testConcurrentAddIterate() throws Exception { - File directory = store.getDirectory(); - store.stop(); - IOHelper.mkdirs(directory); - IOHelper.deleteChildren(directory); - store = new PListStoreImpl(); - store.setIndexPageSize(2 * 1024); - store.setJournalMaxFileLength(1024 * 1024); - store.setDirectory(directory); - store.setCleanupInterval(-1); - store.setIndexEnablePageCaching(false); - store.setIndexWriteBatchSize(100); - store.start(); - - final int iterations = 250; - final int numLists = 10; - - LOG.info("create"); - for (int i = 0; i < numLists; i++) { - new Job(i, PListTest.TaskType.CREATE, iterations).run(); - } - - LOG.info("parallel add and iterate"); - // We want a lot of adds occurring so that new free pages get created - // along - // with overlapping seeks from the iterators so that we are likely to - // seek into - // some bad area in the page file. - executor = Executors.newFixedThreadPool(100); - final int numProducer = 30; - final int numConsumer = 10; - for (int i = 0; i < numLists; i++) { - for (int j = 0; j < numProducer; j++) { - executor.execute(new Job(i, PListTest.TaskType.ADD, iterations)); - } - for (int k = 0; k < numConsumer; k++) { - executor.execute(new Job(i, TaskType.ITERATE, iterations * 2)); - } - } - - executor.shutdown(); - LOG.info("wait for parallel work to complete"); - boolean shutdown = executor.awaitTermination(5 * 60, TimeUnit.SECONDS); - assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); - assertTrue("test did not timeout ", shutdown); - LOG.info("Num dataFiles:" + store.getJournal().getFiles().size()); - } - - enum TaskType { - CREATE, DELETE, ADD, REMOVE, ITERATE, ITERATE_REMOVE - } - - class Job implements Runnable { - - int id; - TaskType task; - int iterations; - - public Job(int id, TaskType t, int iterations) { - this.id = id; - this.task = t; - this.iterations = iterations; - } - - @Override - public void run() { - final String threadName = Thread.currentThread().getName(); - try { - PListImpl plist = null; - switch (task) { - case CREATE: - Thread.currentThread().setName("C:" + id); - plist = store.getPList(String.valueOf(id)); - LOG.info("Job-" + id + ", CREATE"); - break; - case DELETE: - Thread.currentThread().setName("D:" + id); - store.removePList(String.valueOf(id)); - break; - case ADD: - Thread.currentThread().setName("A:" + id); - plist = store.getPList(String.valueOf(id)); - - for (int j = 0; j < iterations; j++) { - synchronized (plistLocks(plist)) { - if (exceptions.isEmpty()) { - plist.addLast("PL>" + id + idSeed + "-" + j, payload); - } - else { - break; - } - } - } - - if (exceptions.isEmpty()) { - LOG.info("Job-" + id + ", Add, done: " + iterations); - } - break; - case REMOVE: - Thread.currentThread().setName("R:" + id); - plist = store.getPList(String.valueOf(id)); - synchronized (plistLocks(plist)) { - - for (int j = iterations - 1; j >= 0; j--) { - plist.remove("PL>" + id + idSeed + "-" + j); - if (j > 0 && j % (iterations / 2) == 0) { - LOG.info("Job-" + id + " Done remove: " + j); - } - } - } - break; - case ITERATE: - Thread.currentThread().setName("I:" + id); - plist = store.getPList(String.valueOf(id)); - int iterateCount = 0; - synchronized (plistLocks(plist)) { - if (exceptions.isEmpty()) { - Iterator<PListEntry> iterator = plist.iterator(); - while (iterator.hasNext() && exceptions.isEmpty()) { - iterator.next(); - iterateCount++; - } - - // LOG.info("Job-" + id + " Done iterate: it=" + - // iterator + ", count:" + iterateCount + - // ", size:" + plist.size()); - if (plist.size() != iterateCount) { - System.err.println("Count Wrong: " + iterator); - } - assertEquals("iterate got all " + id + " iterator:" + iterator, plist.size(), iterateCount); - } - } - break; - - case ITERATE_REMOVE: - Thread.currentThread().setName("IRM:" + id); - plist = store.getPList(String.valueOf(id)); - - int removeCount = 0; - synchronized (plistLocks(plist)) { - - Iterator<PListEntry> removeIterator = plist.iterator(); - - while (removeIterator.hasNext()) { - removeIterator.next(); - removeIterator.remove(); - if (removeCount++ > iterations) { - break; - } - } - } - LOG.info("Job-" + id + " Done remove: " + removeCount); - break; - - default: - } - - } - catch (Exception e) { - LOG.warn("Job[" + id + "] caught exception: " + e.getMessage()); - e.printStackTrace(); - exceptions.add(e); - if (executor != null) { - executor.shutdownNow(); - } - } - finally { - Thread.currentThread().setName(threadName); - } - } - } - - final Map<PList, Object> locks = new HashMap<>(); - - private Object plistLocks(PList plist) { - Object lock = null; - synchronized (locks) { - if (locks.containsKey(plist)) { - lock = locks.get(plist); - } - else { - lock = new Object(); - locks.put(plist, lock); - } - } - return lock; - } - - @Before - public void setUp() throws Exception { - File directory = new File("target/test/PlistDB"); - IOHelper.mkdirs(directory); - IOHelper.deleteChildren(directory); - startStore(directory); - - } - - protected void startStore(File directory) throws Exception { - store = new PListStoreImpl(); - store.setDirectory(directory); - store.start(); - plist = store.getPList("main"); - } - - @After - public void tearDown() throws Exception { - if (executor != null) { - executor.shutdownNow(); - } - store.stop(); - exceptions.clear(); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/shared.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/shared.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/shared.xml deleted file mode 100644 index 5042df8..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/shared.xml +++ /dev/null @@ -1,59 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<beans - xmlns="http://www.springframework.org/schema/beans" - xmlns:amq="http://activemq.apache.org/schema/core" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd - http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> - - <!-- normal ActiveMQ XML config which is less verbose & can be validated --> - <amq:broker brokerName="brokerConfigTest" populateJMSXUserID="false" - useLoggingForShutdownErrors="true" useJmx="true" - persistent="true" vmConnectorURI="vm://javacoola" - useShutdownHook="false" deleteAllMessagesOnStartup="true"> - - <amq:persistenceAdapter> - <amq:kahaDB directory = "target/activemq-data"> - <amq:locker> - <amq:shared-file-locker lockAcquireSleepInterval="5000"/> - </amq:locker> - </amq:kahaDB> - </amq:persistenceAdapter> - - <amq:systemUsage> - <amq:systemUsage> - <amq:memoryUsage> - <amq:memoryUsage limit="10 mb" percentUsageMinDelta="20"/> - </amq:memoryUsage> - <amq:storeUsage> - <amq:storeUsage limit="1 gb" name="foo"/> - </amq:storeUsage> - <amq:tempUsage> - <amq:tempUsage limit="100 mb"/> - </amq:tempUsage> - </amq:systemUsage> - </amq:systemUsage> - - <amq:transportConnectors> - <amq:transportConnector uri="tcp://localhost:61635"/> - </amq:transportConnectors> - - </amq:broker> - -</beans> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBNegativeQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBNegativeQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBNegativeQueueTest.java deleted file mode 100644 index 3c28b3d..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBNegativeQueueTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.leveldb; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.cursors.NegativeQueueTest; -import org.apache.activemq.leveldb.LevelDBStore; -import org.apache.activemq.util.IOHelper; - -import java.io.File; - -public class LevelDBNegativeQueueTest extends NegativeQueueTest { - - @Override - protected void configureBroker(BrokerService answer) throws Exception { - super.configureBroker(answer); - LevelDBStore levelDBStore = new LevelDBStore(); - File directory = new File("target/activemq-data/leveldb"); - IOHelper.deleteChildren(directory); - levelDBStore.setDirectory(directory); - levelDBStore.deleteAllMessages(); - answer.setPersistenceAdapter(levelDBStore); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java deleted file mode 100644 index 99583d5..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.leveldb; - -import java.io.File; - -import junit.framework.Test; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.BrokerTest; -import org.apache.activemq.store.kahadb.KahaDBStore; -import org.apache.activemq.util.IOHelper; -import org.apache.activemq.leveldb.LevelDBStore; - -/** - * Once the wire format is completed we can test against real persistence storage. - */ -public class LevelDBStoreBrokerTest extends BrokerTest { - - @Override - protected void setUp() throws Exception { - this.setAutoFail(true); - super.setUp(); - } - - @Override - protected BrokerService createBroker() throws Exception { - BrokerService broker = new BrokerService(); - LevelDBStore levelDBStore = new LevelDBStore(); - File directory = new File("target/activemq-data/leveldb"); - IOHelper.deleteChildren(directory); - levelDBStore.setDirectory(directory); - levelDBStore.deleteAllMessages(); - broker.setPersistenceAdapter(levelDBStore); - return broker; - } - - protected BrokerService createRestartedBroker() throws Exception { - BrokerService broker = new BrokerService(); - KahaDBStore kaha = new KahaDBStore(); - kaha.setDirectory(new File("target/activemq-data/leveldb")); - broker.setPersistenceAdapter(kaha); - return broker; - } - - public static Test suite() { - return suite(LevelDBStoreBrokerTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/db-1.log ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/db-1.log b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/db-1.log deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java deleted file mode 100644 index 38f0213..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java +++ /dev/null @@ -1,286 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.streams; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; - -import junit.framework.Test; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQInputStream; -import org.apache.activemq.ActiveMQOutputStream; -import org.apache.activemq.JmsTestSupport; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; - -/** - * JMSInputStreamTest - */ -@Deprecated -public class JMSInputStreamTest extends JmsTestSupport { - - public Destination destination; - protected DataOutputStream out; - protected DataInputStream in; - private ActiveMQConnection connection2; - - private ActiveMQInputStream amqIn; - private ActiveMQOutputStream amqOut; - - public static Test suite() { - return suite(JMSInputStreamTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - - public void initCombos() { - addCombinationValues("destination", new Object[]{new ActiveMQQueue("TEST.QUEUE"), new ActiveMQTopic("TEST.TOPIC")}); - } - - @Override - protected void setUp() throws Exception { - super.setAutoFail(true); - super.setUp(); - } - - private void setUpConnection(Map<String, Object> props, long timeout) throws JMSException { - connection2 = (ActiveMQConnection) factory.createConnection(userName, password); - connections.add(connection2); - if (props != null) { - amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination, props, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); - } - else { - amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination); - } - - out = new DataOutputStream(amqOut); - if (timeout == -1) { - amqIn = (ActiveMQInputStream) connection2.createInputStream(destination); - } - else { - amqIn = (ActiveMQInputStream) connection2.createInputStream(destination, null, false, timeout); - } - in = new DataInputStream(amqIn); - } - - /* - * @see TestCase#tearDown() - */ - @Override - protected void tearDown() throws Exception { - super.tearDown(); - } - - /** - * Test for AMQ-3010 - */ - public void testInputStreamTimeout() throws Exception { - long timeout = 500; - - setUpConnection(null, timeout); - try { - in.read(); - fail(); - } - catch (ActiveMQInputStream.ReadTimeoutException e) { - // timeout reached, everything ok - } - in.close(); - } - - // Test for AMQ-2988 - public void testStreamsWithProperties() throws Exception { - String name1 = "PROPERTY_1"; - String name2 = "PROPERTY_2"; - String value1 = "VALUE_1"; - String value2 = "VALUE_2"; - Map<String, Object> jmsProperties = new HashMap<>(); - jmsProperties.put(name1, value1); - jmsProperties.put(name2, value2); - setUpConnection(jmsProperties, -1); - - out.writeInt(4); - out.flush(); - assertTrue(in.readInt() == 4); - out.writeFloat(2.3f); - out.flush(); - assertTrue(in.readFloat() == 2.3f); - String str = "this is a test string"; - out.writeUTF(str); - out.flush(); - assertTrue(in.readUTF().equals(str)); - for (int i = 0; i < 100; i++) { - out.writeLong(i); - } - out.flush(); - - // check properties before we try to read the stream - checkProperties(jmsProperties); - - for (int i = 0; i < 100; i++) { - assertTrue(in.readLong() == i); - } - - // check again after read was done - checkProperties(jmsProperties); - } - - public void testStreamsWithPropertiesOnlyOnFirstMessage() throws Exception { - String name1 = "PROPERTY_1"; - String name2 = "PROPERTY_2"; - String value1 = "VALUE_1"; - String value2 = "VALUE_2"; - Map<String, Object> jmsProperties = new HashMap<>(); - jmsProperties.put(name1, value1); - jmsProperties.put(name2, value2); - - ActiveMQDestination dest = (ActiveMQDestination) destination; - - if (dest.isQueue()) { - destination = new ActiveMQQueue(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true"); - } - else { - destination = new ActiveMQTopic(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true"); - } - - setUpConnection(jmsProperties, -1); - - assertTrue(amqOut.isAddPropertiesOnFirstMsgOnly()); - - out.writeInt(4); - out.flush(); - assertTrue(in.readInt() == 4); - out.writeFloat(2.3f); - out.flush(); - assertTrue(in.readFloat() == 2.3f); - String str = "this is a test string"; - out.writeUTF(str); - out.flush(); - assertTrue(in.readUTF().equals(str)); - for (int i = 0; i < 100; i++) { - out.writeLong(i); - } - out.flush(); - - // check properties before we try to read the stream - checkProperties(jmsProperties); - - for (int i = 0; i < 100; i++) { - assertTrue(in.readLong() == i); - } - - // check again after read was done - checkProperties(jmsProperties); - } - - // check if the received stream has the properties set - // Test for AMQ-2988 - private void checkProperties(Map<String, Object> jmsProperties) throws IOException { - Map<String, Object> receivedJmsProps = amqIn.getJMSProperties(); - - // we should at least have the same amount or more properties - assertTrue(jmsProperties.size() <= receivedJmsProps.size()); - - // check the properties to see if we have everything in there - Iterator<String> propsIt = jmsProperties.keySet().iterator(); - while (propsIt.hasNext()) { - String key = propsIt.next(); - assertTrue(receivedJmsProps.containsKey(key)); - assertEquals(jmsProperties.get(key), receivedJmsProps.get(key)); - } - } - - public void testLarge() throws Exception { - setUpConnection(null, -1); - - final int testData = 23; - final int dataLength = 4096; - final int count = 1024; - byte[] data = new byte[dataLength]; - for (int i = 0; i < data.length; i++) { - data[i] = testData; - } - final AtomicBoolean complete = new AtomicBoolean(false); - Thread runner = new Thread(new Runnable() { - @Override - public void run() { - try { - for (int x = 0; x < count; x++) { - byte[] b = new byte[2048]; - in.readFully(b); - for (int i = 0; i < b.length; i++) { - assertTrue(b[i] == testData); - } - } - complete.set(true); - synchronized (complete) { - complete.notify(); - } - } - catch (Exception ex) { - ex.printStackTrace(); - } - } - }); - runner.start(); - for (int i = 0; i < count; i++) { - out.write(data); - } - out.flush(); - synchronized (complete) { - while (!complete.get()) { - complete.wait(30000); - } - } - assertTrue(complete.get()); - } - - public void testStreams() throws Exception { - setUpConnection(null, -1); - out.writeInt(4); - out.flush(); - assertTrue(in.readInt() == 4); - out.writeFloat(2.3f); - out.flush(); - assertTrue(in.readFloat() == 2.3f); - String str = "this is a test string"; - out.writeUTF(str); - out.flush(); - assertTrue(in.readUTF().equals(str)); - for (int i = 0; i < 100; i++) { - out.writeLong(i); - } - out.flush(); - - for (int i = 0; i < 100; i++) { - assertTrue(in.readLong() == i); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsResourceProvider.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsResourceProvider.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsResourceProvider.java new file mode 100755 index 0000000..45be088 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsResourceProvider.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.test; + +import javax.jms.Connection; +import javax.jms.ConnectionConsumer; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ServerSessionPool; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.activemq.ActiveMQConnectionFactory; + +/** + * + */ +public class JmsResourceProvider { + + private String serverUri = "vm://localhost?broker.persistent=false"; + private boolean transacted; + private int ackMode = Session.AUTO_ACKNOWLEDGE; + private boolean isTopic; + private int deliveryMode = DeliveryMode.PERSISTENT; + private String durableName = "DummyName"; + private String clientID = getClass().getName(); + + /** + * Creates a connection factory. + * + * @see org.apache.activemq.test.JmsResourceProvider#createConnectionFactory() + */ + public ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(serverUri); + } + + /** + * Creates a connection. + * + * @see org.apache.activemq.test.JmsResourceProvider#createConnection(javax.jms.ConnectionFactory) + */ + public Connection createConnection(ConnectionFactory cf) throws JMSException { + Connection connection = cf.createConnection(); + if (getClientID() != null) { + connection.setClientID(getClientID()); + } + return connection; + } + + /** + * @see org.apache.activemq.test.JmsResourceProvider#createSession(javax.jms.Connection) + */ + public Session createSession(Connection conn) throws JMSException { + return conn.createSession(transacted, ackMode); + } + + /** + * @see org.apache.activemq.test.JmsResourceProvider#createConsumer(javax.jms.Session, + * javax.jms.Destination) + */ + public MessageConsumer createConsumer(Session session, Destination destination) throws JMSException { + if (isDurableSubscriber()) { + return session.createDurableSubscriber((Topic)destination, durableName); + } + return session.createConsumer(destination); + } + + /** + * Creates a connection for a consumer. + * + * @param ssp - ServerSessionPool + * @return ConnectionConsumer + */ + public ConnectionConsumer createConnectionConsumer(Connection connection, Destination destination, ServerSessionPool ssp) throws JMSException { + return connection.createConnectionConsumer(destination, null, ssp, 1); + } + + /** + * Creates a producer. + * + * @see org.apache.activemq.test.JmsResourceProvider#createProducer(javax.jms.Session, + * javax.jms.Destination) + */ + public MessageProducer createProducer(Session session, Destination destination) throws JMSException { + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(deliveryMode); + return producer; + } + + /** + * Creates a destination, which can either a topic or a queue. + * + * @see org.apache.activemq.test.JmsResourceProvider#createDestination(javax.jms.Session, + * String) + */ + public Destination createDestination(Session session, String name) throws JMSException { + if (isTopic) { + return session.createTopic("TOPIC." + name); + } else { + return session.createQueue("QUEUE." + name); + } + } + + /** + * Returns true if the subscriber is durable. + * + * @return isDurableSubscriber + */ + public boolean isDurableSubscriber() { + return isTopic && durableName != null; + } + + /** + * Returns the acknowledgement mode. + * + * @return Returns the ackMode. + */ + public int getAckMode() { + return ackMode; + } + + /** + * Sets the acnknowledgement mode. + * + * @param ackMode The ackMode to set. + */ + public void setAckMode(int ackMode) { + this.ackMode = ackMode; + } + + /** + * Returns true if the destination is a topic, false if the destination is a + * queue. + * + * @return Returns the isTopic. + */ + public boolean isTopic() { + return isTopic; + } + + /** + * @param isTopic The isTopic to set. + */ + public void setTopic(boolean isTopic) { + this.isTopic = isTopic; + } + + /** + * Returns the server URI. + * + * @return Returns the serverUri. + */ + public String getServerUri() { + return serverUri; + } + + /** + * Sets the server URI. + * + * @param serverUri - the server URI to set. + */ + public void setServerUri(String serverUri) { + this.serverUri = serverUri; + } + + /** + * Return true if the session is transacted. + * + * @return Returns the transacted. + */ + public boolean isTransacted() { + return transacted; + } + + /** + * Sets the session to be transacted. + * + * @param transacted + */ + public void setTransacted(boolean transacted) { + this.transacted = transacted; + if (transacted) { + setAckMode(Session.SESSION_TRANSACTED); + } + } + + /** + * Returns the delivery mode. + * + * @return deliveryMode + */ + public int getDeliveryMode() { + return deliveryMode; + } + + /** + * Sets the delivery mode. + * + * @param deliveryMode + */ + public void setDeliveryMode(int deliveryMode) { + this.deliveryMode = deliveryMode; + } + + /** + * Returns the client id. + * + * @return clientID + */ + public String getClientID() { + return clientID; + } + + /** + * Sets the client id. + * + * @param clientID + */ + public void setClientID(String clientID) { + this.clientID = clientID; + } + + /** + * Returns the durable name of the provider. + * + * @return durableName + */ + public String getDurableName() { + return durableName; + } + + /** + * Sets the durable name of the provider. + * + * @param durableName + */ + public void setDurableName(String durableName) { + this.durableName = durableName; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/TestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/TestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/TestSupport.java new file mode 100755 index 0000000..be32877 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/TestSupport.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.test; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.TextMessage; +import java.io.File; +import java.lang.reflect.Array; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Useful base class for unit test cases + * + * + */ +public abstract class TestSupport extends TestCase { + private static final Logger LOG = LoggerFactory.getLogger(TestSupport.class); + + protected ActiveMQConnectionFactory connectionFactory; + protected boolean topic = true; + + public TestSupport() { + super(); + } + + public TestSupport(String name) { + super(name); + } + + /** + * Creates an ActiveMQMessage. + * + * @return ActiveMQMessage + */ + protected ActiveMQMessage createMessage() { + return new ActiveMQMessage(); + } + + /** + * Creates a destination. + * + * @param subject - topic or queue name. + * @return Destination - either an ActiveMQTopic or ActiveMQQUeue. + */ + protected Destination createDestination(String subject) { + if (topic) { + return new ActiveMQTopic(subject); + } else { + return new ActiveMQQueue(subject); + } + } + + /** + * Tests if firstSet and secondSet are equal. + * + * @param messsage - string to be displayed when the assertion fails. + * @param firstSet[] - set of messages to be compared with its counterpart + * in the secondset. + * @param secondSet[] - set of messages to be compared with its counterpart + * in the firstset. + * @throws javax.jms.JMSException + */ + protected void assertTextMessagesEqual(Message[] firstSet, Message[] secondSet) throws JMSException { + assertTextMessagesEqual("", firstSet, secondSet); + } + + /** + * Tests if firstSet and secondSet are equal. + * + * @param messsage - string to be displayed when the assertion fails. + * @param firstSet[] - set of messages to be compared with its counterpart + * in the secondset. + * @param secondSet[] - set of messages to be compared with its counterpart + * in the firstset. + */ + protected void assertTextMessagesEqual(String messsage, Message[] firstSet, Message[] secondSet) throws JMSException { + assertEquals("Message count does not match: " + messsage, firstSet.length, secondSet.length); + + for (int i = 0; i < secondSet.length; i++) { + TextMessage m1 = (TextMessage)firstSet[i]; + TextMessage m2 = (TextMessage)secondSet[i]; + assertTextMessageEqual("Message " + (i + 1) + " did not match : ", m1, m2); + } + } + + /** + * Tests if m1 and m2 are equal. + * + * @param m1 - message to be compared with m2. + * @param m2 - message to be compared with m1. + * @throws javax.jms.JMSException + */ + protected void assertEquals(TextMessage m1, TextMessage m2) throws JMSException { + assertEquals("", m1, m2); + } + + /** + * Tests if m1 and m2 are equal. + * + * @param message - string to be displayed when the assertion fails. + * @param m1 - message to be compared with m2. + * @param m2 - message to be compared with m1. + */ + protected void assertTextMessageEqual(String message, TextMessage m1, TextMessage m2) throws JMSException { + assertFalse(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1 == null ^ m2 == null); + + if (m1 == null) { + return; + } + + assertEquals(message, m1.getText(), m2.getText()); + } + + /** + * Tests if m1 and m2 are equal. + * + * @param m1 - message to be compared with m2. + * @param m2 - message to be compared with m1. + * @throws javax.jms.JMSException + */ + protected void assertEquals(Message m1, Message m2) throws JMSException { + assertEquals("", m1, m2); + } + + /** + * Tests if m1 and m2 are equal. + * + * @param message - error message. + * @param m1 - message to be compared with m2. + * @param m2 -- message to be compared with m1. + */ + protected void assertEquals(String message, Message m1, Message m2) throws JMSException { + assertFalse(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1 == null ^ m2 == null); + + if (m1 == null) { + return; + } + + assertTrue(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1.getClass() == m2.getClass()); + + if (m1 instanceof TextMessage) { + assertTextMessageEqual(message, (TextMessage)m1, (TextMessage)m2); + } else { + assertEquals(message, m1, m2); + } + } + + /** + * Test if base directory contains spaces + */ + protected void assertBaseDirectoryContainsSpaces() { + assertFalse("Base directory cannot contain spaces.", new File(System.getProperty("basedir", ".")).getAbsoluteFile().toString().contains(" ")); + } + + /** + * Creates an ActiveMQConnectionFactory. + * + * @return ActiveMQConnectionFactory + * @throws Exception + */ + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + } + + /** + * Factory method to create a new connection. + * + * @return connection + * @throws Exception + */ + protected Connection createConnection() throws Exception { + return getConnectionFactory().createConnection(); + } + + /** + * Creates an ActiveMQ connection factory. + * + * @return connectionFactory + * @throws Exception + */ + public ActiveMQConnectionFactory getConnectionFactory() throws Exception { + if (connectionFactory == null) { + connectionFactory = createConnectionFactory(); + assertTrue("Should have created a connection factory!", connectionFactory != null); + } + + return connectionFactory; + } + + /** + * Returns the consumer subject. + * + * @return String + */ + protected String getConsumerSubject() { + return getSubject(); + } + + /** + * Returns the producer subject. + * + * @return String + */ + protected String getProducerSubject() { + return getSubject(); + } + + /** + * Returns the subject. + * + * @return String + */ + protected String getSubject() { + return getClass().getName() + "." + getName(); + } + + protected void assertArrayEqual(String message, Object[] expected, Object[] actual) { + assertEquals(message + ". Array length", expected.length, actual.length); + for (int i = 0; i < expected.length; i++) { + assertEquals(message + ". element: " + i, expected[i], actual[i]); + } + } + + protected void assertPrimitiveArrayEqual(String message, Object expected, Object actual) { + int length = Array.getLength(expected); + assertEquals(message + ". Array length", length, Array.getLength(actual)); + for (int i = 0; i < length; i++) { + assertEquals(message + ". element: " + i, Array.get(expected, i), Array.get(actual, i)); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/QueueClusterTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/QueueClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/QueueClusterTest.java index 9902bd2..56d1546 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/QueueClusterTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/QueueClusterTest.java @@ -16,13 +16,16 @@ */ package org.apache.activemq.transport; +import org.junit.Before; + /** * */ public class QueueClusterTest extends TopicClusterTest { @Override - protected void setUp() throws Exception { + @Before + public void setUp() throws Exception { topic = false; super.setUp(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java index 1b95006..3506ff0 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java @@ -21,42 +21,50 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.Session; -import junit.framework.Test; - import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.JmsTestSupport; -import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.util.SocketProxy; -import org.apache.activemq.util.URISupport; -import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SoWriteTimeoutClientTest extends JmsTestSupport { +public class SoWriteTimeoutClientTest extends OpenwireArtemisBaseTest { private static final Logger LOG = LoggerFactory.getLogger(SoWriteTimeoutClientTest.class); + private String messageTextPrefix = ""; + private EmbeddedJMS server; + + @Before + public void setUp() throws Exception { + Configuration config = this.createConfig(0); + server = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl()); + server.start(); + } - @Override - protected BrokerService createBroker() throws Exception { - BrokerService broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(true); - KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); - adapter.setConcurrentStoreAndDispatchQueues(false); - broker.setPersistenceAdapter(adapter); - broker.addConnector("tcp://localhost:0?wireFormat.maxInactivityDuration=0"); - return broker; + @After + public void tearDown() throws Exception { + server.stop(); } + @Test public void testSendWithClientWriteTimeout() throws Exception { final ActiveMQQueue dest = new ActiveMQQueue("testClientWriteTimeout"); messageTextPrefix = initMessagePrefix(80 * 1024); - URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri()); + URI tcpBrokerUri = new URI(newURI(0)); LOG.info("consuming using uri: " + tcpBrokerUri); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri); @@ -92,25 +100,34 @@ public class SoWriteTimeoutClientTest extends JmsTestSupport { TimeUnit.SECONDS.sleep(8); proxy.goOn(); for (int i = 0; i < messageCount; i++) { - assertNotNull("Got message " + i + " after reconnect", consumer.receive(5000)); + Assert.assertNotNull("Got message " + i + " after reconnect", consumer.receive(10000)); } - assertTrue("no pending messages when done", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { + Assert.assertNull(consumer.receive(5000)); - LOG.info("current total message count: " + broker.getAdminView().getTotalMessageCount()); - return broker.getAdminView().getTotalMessageCount() == 0; - } - })); + } + + protected void sendMessages(Connection connection, Destination destination, int count) throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + sendMessages(session, destination, count); + session.close(); + } + + protected void sendMessages(Session session, Destination destination, int count) throws JMSException { + MessageProducer producer = session.createProducer(destination); + sendMessages(session, producer, count); + producer.close(); + } + + protected void sendMessages(Session session, MessageProducer producer, int count) throws JMSException { + for (int i = 0; i < count; i++) { + producer.send(session.createTextMessage(messageTextPrefix + i)); + } } private String initMessagePrefix(int i) { byte[] content = new byte[i]; return new String(content); } +} - public static Test suite() { - return suite(SoWriteTimeoutClientTest.class); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java index 5157c33..c2a7d24 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java @@ -17,9 +17,6 @@ package org.apache.activemq.transport; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; @@ -33,22 +30,23 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import junit.framework.TestCase; - import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.util.ServiceStopper; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * */ -public class TopicClusterTest extends TestCase implements MessageListener { +public class TopicClusterTest extends OpenwireArtemisBaseTest implements MessageListener { protected static final int MESSAGE_COUNT = 50; protected static final int NUMBER_IN_CLUSTER = 3; @@ -60,11 +58,11 @@ public class TopicClusterTest extends TestCase implements MessageListener { protected int deliveryMode = DeliveryMode.NON_PERSISTENT; protected MessageProducer[] producers; protected Connection[] connections; - protected List<BrokerService> services = new ArrayList<>(); + protected EmbeddedJMS[] servers = new EmbeddedJMS[NUMBER_IN_CLUSTER]; protected String groupId; - @Override - protected void setUp() throws Exception { + @Before + public void setUp() throws Exception { groupId = "topic-cluster-test-" + System.currentTimeMillis(); connections = new Connection[NUMBER_IN_CLUSTER]; producers = new MessageProducer[NUMBER_IN_CLUSTER]; @@ -73,11 +71,13 @@ public class TopicClusterTest extends TestCase implements MessageListener { if (root == null) { root = "target/store"; } + + this.setUpClusterServers(servers); try { for (int i = 0; i < NUMBER_IN_CLUSTER; i++) { System.setProperty("activemq.store.dir", root + "_broker_" + i); - connections[i] = createConnection("broker-" + i); + connections[i] = createConnection(i); connections[i].setClientID("ClusterTest" + i); connections[i].start(); Session session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -95,42 +95,35 @@ public class TopicClusterTest extends TestCase implements MessageListener { } } - @Override - protected void tearDown() throws Exception { + @After + public void tearDown() throws Exception { if (connections != null) { for (int i = 0; i < connections.length; i++) { - connections[i].close(); + try { + connections[i].close(); + } catch (Exception e) { + //ignore. + } } } - ServiceStopper stopper = new ServiceStopper(); - stopper.stopServices(services); + this.shutDownClusterServers(servers); } protected MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException { return session.createConsumer(destination); } - protected ActiveMQConnectionFactory createGenericClusterFactory(String brokerName) throws Exception { - BrokerService container = new BrokerService(); - container.setBrokerName(brokerName); - - String url = "tcp://localhost:0"; - TransportConnector connector = container.addConnector(url); - connector.setDiscoveryUri(new URI("multicast://default?group=" + groupId)); - container.addNetworkConnector("multicast://default?group=" + groupId); - container.start(); - - services.add(container); - - return new ActiveMQConnectionFactory("vm://" + brokerName); + protected ActiveMQConnectionFactory createGenericClusterFactory(int serverID) throws Exception { + String url = newURI(serverID); + return new ActiveMQConnectionFactory(url); } protected int expectedReceiveCount() { return MESSAGE_COUNT * NUMBER_IN_CLUSTER * NUMBER_IN_CLUSTER; } - protected Connection createConnection(String name) throws Exception { - return createGenericClusterFactory(name).createConnection(); + protected Connection createConnection(int serverID) throws Exception { + return createGenericClusterFactory(serverID).createConnection(); } protected Destination createDestination() { @@ -146,10 +139,6 @@ public class TopicClusterTest extends TestCase implements MessageListener { } } - /** - * @param msg - */ - @Override public void onMessage(Message msg) { // log.info("GOT: " + msg); receivedMessageCount.incrementAndGet(); @@ -160,9 +149,7 @@ public class TopicClusterTest extends TestCase implements MessageListener { } } - /** - * @throws Exception - */ + @Test public void testSendReceive() throws Exception { for (int i = 0; i < MESSAGE_COUNT; i++) { TextMessage textMessage = new ActiveMQTextMessage(); @@ -178,8 +165,8 @@ public class TopicClusterTest extends TestCase implements MessageListener { } // sleep a little - to check we don't get too many messages Thread.sleep(2000); - LOG.info("GOT: " + receivedMessageCount.get()); - assertEquals("Expected message count not correct", expectedReceiveCount(), receivedMessageCount.get()); + LOG.info("GOT: " + receivedMessageCount.get() + " Expected: " + expectedReceiveCount()); + Assert.assertEquals("Expected message count not correct", expectedReceiveCount(), receivedMessageCount.get()); } }