Working on a test case for https://issues.apache.org/jira/browse/AMQ-4837 : LevelDB corrupted in AMQ cluster.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5161087f Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5161087f Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5161087f Branch: refs/heads/activemq-5.9 Commit: 5161087f31f6217d423c4683041c18bc5655889e Parents: 251b7da Author: Hiram Chirino <[email protected]> Authored: Wed Oct 30 15:35:26 2013 -0400 Committer: Hadrian Zbarcea <[email protected]> Committed: Tue Mar 11 21:16:58 2014 -0400 ---------------------------------------------------------------------- .../leveldb/test/ElectingLevelDBStoreTest.java | 84 +-------- .../test/ReplicatedLevelDBBrokerTest.java | 187 +++++++++++++++++++ .../leveldb/test/ZooKeeperTestSupport.java | 111 +++++++++++ 3 files changed, 300 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/5161087f/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java index f3c1581..c876f51 100644 --- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java +++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java @@ -16,25 +16,17 @@ */ package org.apache.activemq.leveldb.test; -import junit.framework.TestCase; import org.apache.activemq.Service; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.leveldb.CountDownFuture; import org.apache.activemq.leveldb.LevelDBStore; import org.apache.activemq.leveldb.replicated.ElectingLevelDBStore; -import org.apache.activemq.leveldb.util.FileSupport; import org.apache.activemq.store.MessageStore; -import org.apache.zookeeper.server.NIOServerCnxnFactory; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.persistence.FileTxnSnapLog; -import org.junit.After; -import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashSet; import java.util.concurrent.TimeUnit; @@ -43,37 +35,9 @@ import static org.junit.Assert.*; /** */ -public class ElectingLevelDBStoreTest { - protected static final Logger LOG = LoggerFactory.getLogger(ElectingLevelDBStoreTest.class); - - NIOServerCnxnFactory connector; - - static File data_dir() { - return new File("target/activemq-data/leveldb-elections"); - } +public class ElectingLevelDBStoreTest extends ZooKeeperTestSupport { - - @Before - public void setUp() throws Exception { - FileSupport.toRichFile(data_dir()).recursiveDelete(); - - System.out.println("Starting ZooKeeper"); - ZooKeeperServer zk_server = new ZooKeeperServer(); - zk_server.setTickTime(500); - zk_server.setTxnLogFactory(new FileTxnSnapLog(new File(data_dir(), "zk-log"), new File(data_dir(), "zk-data"))); - connector = new NIOServerCnxnFactory(); - connector.configure(new InetSocketAddress(0), 100); - connector.startup(zk_server); - System.out.println("ZooKeeper started"); - } - - @After - public void tearDown() throws Exception { - if( connector!=null ) { - connector.shutdown(); - connector = null; - } - } + protected static final Logger LOG = LoggerFactory.getLogger(ElectingLevelDBStoreTest.class); @Test(timeout = 1000*60*60) public void testElection() throws Exception { @@ -210,50 +174,6 @@ public class ElectingLevelDBStoreTest { } } - static interface Task { - public void run() throws Exception; - } - - private void within(int time, TimeUnit unit, Task task) throws InterruptedException { - long timeMS = unit.toMillis(time); - long deadline = System.currentTimeMillis() + timeMS; - while (true) { - try { - task.run(); - return; - } catch (Throwable e) { - long remaining = deadline - System.currentTimeMillis(); - if( remaining <=0 ) { - if( e instanceof RuntimeException ) { - throw (RuntimeException)e; - } - if( e instanceof Error ) { - throw (Error)e; - } - throw new RuntimeException(e); - } - Thread.sleep(Math.min(timeMS/10, remaining)); - } - } - } - - private CountDownFuture waitFor(int timeout, CountDownFuture... futures) throws InterruptedException { - long deadline = System.currentTimeMillis()+timeout; - while( true ) { - for (CountDownFuture f:futures) { - if( f.await(1, TimeUnit.MILLISECONDS) ) { - return f; - } - } - long remaining = deadline - System.currentTimeMillis(); - if( remaining < 0 ) { - return null; - } else { - Thread.sleep(Math.min(remaining / 10, 100L)); - } - } - } - private CountDownFuture asyncStart(final Service service) { final CountDownFuture<Throwable> f = new CountDownFuture<Throwable>(); LevelDBStore.BLOCKING_EXECUTOR().execute(new Runnable() { http://git-wip-us.apache.org/repos/asf/activemq/blob/5161087f/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java new file mode 100644 index 0000000..d9e44b8 --- /dev/null +++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.leveldb.test; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.leveldb.replicated.ElectingLevelDBStore; +import org.junit.After; +import org.junit.Test; + +import javax.jms.*; +import java.io.File; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Holds broker unit tests of the replicated leveldb store. + */ +public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport { + + final SynchronousQueue<BrokerService> masterQueue = new SynchronousQueue<BrokerService>(); + ArrayList<BrokerService> brokers = new ArrayList<BrokerService>(); + + /** + * Tries to replicate the problem reported at: + * https://issues.apache.org/jira/browse/AMQ-4837 + */ + @Test(timeout = 1000*60*10) + public void testAMQ4837() throws Exception { + + // 1. Start 3 activemq nodes. + startBrokerAsync(createBrokerNode("node-1")); + startBrokerAsync(createBrokerNode("node-2")); + startBrokerAsync(createBrokerNode("node-3")); + + // 2. Push a message to the master and browse the queue + System.out.println("Wait for master to start up..."); + BrokerService master = masterQueue.poll(60, TimeUnit.SECONDS); + assertNotNull("Master elected", master); + sendMessage(master, "Hello World #1"); + assertEquals(1, browseMessages(master).size()); + + // 3. Stop master node + System.out.println("Stopping master..."); + master.stop(); + master.waitUntilStopped(); + BrokerService prevMaster = master; + + // 4. Push a message to the new master (Node2) and browse the queue using the web UI. Message summary and queue content ok. + System.out.println("Wait for new master to start up..."); + master = masterQueue.poll(60, TimeUnit.SECONDS); + assertNotNull("Master elected", master); + sendMessage(master, "Hello World #2"); + assertEquals(2, browseMessages(master).size()); + + // 5. Start Node1 + System.out.println("Starting previous master..."); + prevMaster = createBrokerNode(prevMaster.getBrokerName()); + startBrokerAsync(prevMaster); + + // 6. Stop master node (Node2) + System.out.println("Stopping master..."); + master.stop(); + master.waitUntilStopped(); + + // 7. Browse the queue using the web UI on new master (Node3). Message summary ok however when clicking on the queue, no message details. + // An error (see below) is logged by the master, which attempts a restart. + System.out.println("Wait for new master to start up..."); + master = masterQueue.poll(60, TimeUnit.SECONDS); + assertNotNull("Master elected", master); + assertEquals(2, browseMessages(master).size()); + + } + + private void startBrokerAsync(BrokerService b) { + final BrokerService broker = b; + new Thread("Starting broker node: "+b.getBrokerName()){ + @Override + public void run() { + try { + broker.start(); + broker.waitUntilStarted(); + masterQueue.put(broker); + } catch (Exception e) { + e.printStackTrace(); + } + } + }.start(); + } + + private void sendMessage(BrokerService brokerService, String body) throws Exception { + TransportConnector connector = brokerService.getTransportConnectors().get(0); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connector.getConnectUri()); + Connection connection = factory.createConnection(); + try { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue("FOO")); + producer.send(session.createTextMessage(body)); + } finally { + connection.close(); + } + } + + private ArrayList<String> browseMessages(BrokerService brokerService) throws Exception { + ArrayList<String> rc = new ArrayList<String>(); + TransportConnector connector = brokerService.getTransportConnectors().get(0); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connector.getConnectUri()); + Connection connection = factory.createConnection(); + try { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + QueueBrowser browser = session.createBrowser(session.createQueue("FOO")); + Enumeration enumeration = browser.getEnumeration(); + while (enumeration.hasMoreElements()) { + TextMessage textMessage = (TextMessage) enumeration.nextElement(); + rc.add(textMessage.getText()); + } + } finally { + connection.close(); + } + return rc; + } + + @After + public void closeBrokers() throws Exception { + for (BrokerService broker : brokers) { + try { + broker.stop(); + broker.waitUntilStopped(); + } catch (Exception e) { + } + } + } + + private BrokerService createBrokerNode(String id) throws Exception { + BrokerService bs = new BrokerService(); + bs.getManagementContext().setCreateConnector(false); + brokers.add(bs); + bs.setBrokerName(id); + bs.setPersistenceAdapter(createStoreNode(id)); + bs.addConnector("tcp://0.0.0.0:0"); + return bs; + } + + + private ElectingLevelDBStore createStoreNode(String id) { + + // This little hack is in here because we give each of the 3 brokers + // different broker names so they can show up in JMX correctly, + // but the store needs to be configured with the same broker name + // so that they can find each other in ZK properly. + ElectingLevelDBStore store = new ElectingLevelDBStore() { + @Override + public void start() throws Exception { + this.setBrokerName("localhost"); + super.start(); + } + }; + store.setDirectory(new File(data_dir(), id)); + store.setReplicas(3); + store.setZkAddress("localhost:" + connector.getLocalPort()); + store.setHostname("localhost"); + store.setBind("tcp://0.0.0.0:0"); + return store; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/5161087f/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ZooKeeperTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ZooKeeperTestSupport.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ZooKeeperTestSupport.java new file mode 100644 index 0000000..db65b43 --- /dev/null +++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ZooKeeperTestSupport.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.leveldb.test; + +import org.apache.activemq.leveldb.CountDownFuture; +import org.apache.activemq.leveldb.util.FileSupport; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.junit.After; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; + +/** + * Created by chirino on 10/30/13. + */ +public class ZooKeeperTestSupport { + + protected NIOServerCnxnFactory connector; + + static File data_dir() { + return new File("target/activemq-data/leveldb-elections"); + } + + + @Before + public void startZooKeeper() throws Exception { + FileSupport.toRichFile(data_dir()).recursiveDelete(); + + System.out.println("Starting ZooKeeper"); + ZooKeeperServer zk_server = new ZooKeeperServer(); + zk_server.setTickTime(500); + zk_server.setTxnLogFactory(new FileTxnSnapLog(new File(data_dir(), "zk-log"), new File(data_dir(), "zk-data"))); + connector = new NIOServerCnxnFactory(); + connector.configure(new InetSocketAddress(0), 100); + connector.startup(zk_server); + System.out.println("ZooKeeper started"); + } + + @After + public void stopZooKeeper() throws Exception { + if( connector!=null ) { + connector.shutdown(); + connector = null; + } + } + + + protected static interface Task { + public void run() throws Exception; + } + + protected void within(int time, TimeUnit unit, Task task) throws InterruptedException { + long timeMS = unit.toMillis(time); + long deadline = System.currentTimeMillis() + timeMS; + while (true) { + try { + task.run(); + return; + } catch (Throwable e) { + long remaining = deadline - System.currentTimeMillis(); + if( remaining <=0 ) { + if( e instanceof RuntimeException ) { + throw (RuntimeException)e; + } + if( e instanceof Error ) { + throw (Error)e; + } + throw new RuntimeException(e); + } + Thread.sleep(Math.min(timeMS/10, remaining)); + } + } + } + + protected CountDownFuture waitFor(int timeout, CountDownFuture... futures) throws InterruptedException { + long deadline = System.currentTimeMillis()+timeout; + while( true ) { + for (CountDownFuture f:futures) { + if( f.await(1, TimeUnit.MILLISECONDS) ) { + return f; + } + } + long remaining = deadline - System.currentTimeMillis(); + if( remaining < 0 ) { + return null; + } else { + Thread.sleep(Math.min(remaining / 10, 100L)); + } + } + } +}
