http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java new file mode 100644 index 0000000..f687f45 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java @@ -0,0 +1,435 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.TestableZooKeeper; +import org.apache.zookeeper.server.quorum.Election; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; +import org.apache.zookeeper.server.util.OSMXBean; +import org.junit.Assert; +import org.junit.Test; + + +public class QuorumBase extends ClientBase { + private static final Logger LOG = LoggerFactory.getLogger(QuorumBase.class); + + private static final String LOCALADDR = "127.0.0.1"; + + File s1dir, s2dir, s3dir, s4dir, s5dir; + QuorumPeer s1, s2, s3, s4, s5; + protected int port1; + protected int port2; + protected int port3; + protected int port4; + protected int port5; + + protected int portLE1; + protected int portLE2; + protected int portLE3; + protected int portLE4; + protected int portLE5; + + protected int portClient1; + protected int portClient2; + protected int portClient3; + protected int portClient4; + protected int portClient5; + + protected boolean localSessionsEnabled = false; + protected boolean localSessionsUpgradingEnabled = false; + + @Test + // This just avoids complaints by junit + public void testNull() { + } + + @Override + public void setUp() throws Exception { + setUp(false); + } + + protected void setUp(boolean withObservers) throws Exception { + LOG.info("QuorumBase.setup " + getTestName()); + setupTestEnv(); + + JMXEnv.setUp(); + + setUpAll(); + + port1 = PortAssignment.unique(); + port2 = PortAssignment.unique(); + port3 = PortAssignment.unique(); + port4 = PortAssignment.unique(); + port5 = PortAssignment.unique(); + + portLE1 = PortAssignment.unique(); + portLE2 = PortAssignment.unique(); + portLE3 = PortAssignment.unique(); + portLE4 = PortAssignment.unique(); + portLE5 = PortAssignment.unique(); + + portClient1 = PortAssignment.unique(); + portClient2 = PortAssignment.unique(); + portClient3 = PortAssignment.unique(); + portClient4 = PortAssignment.unique(); + portClient5 = PortAssignment.unique(); + + hostPort = "127.0.0.1:" + portClient1 + + ",127.0.0.1:" + portClient2 + + ",127.0.0.1:" + portClient3 + + ",127.0.0.1:" + portClient4 + + ",127.0.0.1:" + portClient5; + LOG.info("Ports are: " + hostPort); + + s1dir = ClientBase.createTmpDir(); + s2dir = ClientBase.createTmpDir(); + s3dir = ClientBase.createTmpDir(); + s4dir = ClientBase.createTmpDir(); + s5dir = ClientBase.createTmpDir(); + + startServers(withObservers); + + OSMXBean osMbean = new OSMXBean(); + if (osMbean.getUnix() == true) { + LOG.info("Initial fdcount is: " + + osMbean.getOpenFileDescriptorCount()); + } + + LOG.info("Setup finished"); + } + + void startServers() throws Exception { + startServers(false); + } + + void startServers(boolean withObservers) throws Exception { + int tickTime = 2000; + int initLimit = 3; + int syncLimit = 3; + HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>(); + peers.put(Long.valueOf(1), new QuorumServer(1, + new InetSocketAddress(LOCALADDR, port1), + new InetSocketAddress(LOCALADDR, portLE1), + new InetSocketAddress(LOCALADDR, portClient1), + LearnerType.PARTICIPANT)); + peers.put(Long.valueOf(2), new QuorumServer(2, + new InetSocketAddress(LOCALADDR, port2), + new InetSocketAddress(LOCALADDR, portLE2), + new InetSocketAddress(LOCALADDR, portClient2), + LearnerType.PARTICIPANT)); + peers.put(Long.valueOf(3), new QuorumServer(3, + new InetSocketAddress(LOCALADDR, port3), + new InetSocketAddress(LOCALADDR, portLE3), + new InetSocketAddress(LOCALADDR, portClient3), + LearnerType.PARTICIPANT)); + peers.put(Long.valueOf(4), new QuorumServer(4, + new InetSocketAddress(LOCALADDR, port4), + new InetSocketAddress(LOCALADDR, portLE4), + new InetSocketAddress(LOCALADDR, portClient4), + LearnerType.PARTICIPANT)); + peers.put(Long.valueOf(5), new QuorumServer(5, + new InetSocketAddress(LOCALADDR, port5), + new InetSocketAddress(LOCALADDR, portLE5), + new InetSocketAddress(LOCALADDR, portClient5), + LearnerType.PARTICIPANT)); + + if (withObservers) { + peers.get(Long.valueOf(4)).type = LearnerType.OBSERVER; + peers.get(Long.valueOf(5)).type = LearnerType.OBSERVER; + } + + LOG.info("creating QuorumPeer 1 port " + portClient1); + s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit); + Assert.assertEquals(portClient1, s1.getClientPort()); + LOG.info("creating QuorumPeer 2 port " + portClient2); + s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit); + Assert.assertEquals(portClient2, s2.getClientPort()); + LOG.info("creating QuorumPeer 3 port " + portClient3); + s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit); + Assert.assertEquals(portClient3, s3.getClientPort()); + LOG.info("creating QuorumPeer 4 port " + portClient4); + s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime, initLimit, syncLimit); + Assert.assertEquals(portClient4, s4.getClientPort()); + LOG.info("creating QuorumPeer 5 port " + portClient5); + s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit); + Assert.assertEquals(portClient5, s5.getClientPort()); + + if (withObservers) { + s4.setLearnerType(LearnerType.OBSERVER); + s5.setLearnerType(LearnerType.OBSERVER); + } + + LOG.info("QuorumPeer 1 voting view: " + s1.getVotingView()); + LOG.info("QuorumPeer 2 voting view: " + s2.getVotingView()); + LOG.info("QuorumPeer 3 voting view: " + s3.getVotingView()); + LOG.info("QuorumPeer 4 voting view: " + s4.getVotingView()); + LOG.info("QuorumPeer 5 voting view: " + s5.getVotingView()); + + s1.enableLocalSessions(localSessionsEnabled); + s2.enableLocalSessions(localSessionsEnabled); + s3.enableLocalSessions(localSessionsEnabled); + s4.enableLocalSessions(localSessionsEnabled); + s5.enableLocalSessions(localSessionsEnabled); + s1.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled); + s2.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled); + s3.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled); + s4.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled); + s5.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled); + + LOG.info("start QuorumPeer 1"); + s1.start(); + LOG.info("start QuorumPeer 2"); + s2.start(); + LOG.info("start QuorumPeer 3"); + s3.start(); + LOG.info("start QuorumPeer 4"); + s4.start(); + LOG.info("start QuorumPeer 5"); + s5.start(); + LOG.info("started QuorumPeer 5"); + + LOG.info ("Checking ports " + hostPort); + for (String hp : hostPort.split(",")) { + Assert.assertTrue("waiting for server up", + ClientBase.waitForServerUp(hp, + CONNECTION_TIMEOUT)); + LOG.info(hp + " is accepting client connections"); + } + + // interesting to see what's there... + JMXEnv.dump(); + // make sure we have these 5 servers listed + Set<String> ensureNames = new LinkedHashSet<String>(); + for (int i = 1; i <= 5; i++) { + ensureNames.add("InMemoryDataTree"); + } + for (int i = 1; i <= 5; i++) { + ensureNames.add("name0=ReplicatedServer_id" + i + + ",name1=replica." + i + ",name2="); + } + for (int i = 1; i <= 5; i++) { + for (int j = 1; j <= 5; j++) { + ensureNames.add("name0=ReplicatedServer_id" + i + + ",name1=replica." + j); + } + } + for (int i = 1; i <= 5; i++) { + ensureNames.add("name0=ReplicatedServer_id" + i); + } + JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()])); + } + + public int getLeaderIndex() { + if (s1.getPeerState() == ServerState.LEADING) { + return 0; + } else if (s2.getPeerState() == ServerState.LEADING) { + return 1; + } else if (s3.getPeerState() == ServerState.LEADING) { + return 2; + } else if (s4.getPeerState() == ServerState.LEADING) { + return 3; + } else if (s5.getPeerState() == ServerState.LEADING) { + return 4; + } + return -1; + } + + public String getPeersMatching(ServerState state) { + StringBuilder hosts = new StringBuilder(); + for (QuorumPeer p : getPeerList()) { + if (p.getPeerState() == state) { + hosts.append(String.format("%s:%d,", LOCALADDR, p.getClientAddress().getPort())); + } + } + LOG.info("getPeersMatching ports are {}", hosts); + return hosts.toString(); + } + + public ArrayList<QuorumPeer> getPeerList() { + ArrayList<QuorumPeer> peers = new ArrayList<QuorumPeer>(); + peers.add(s1); + peers.add(s2); + peers.add(s3); + peers.add(s4); + peers.add(s5); + return peers; + } + + public void setupServers() throws IOException { + setupServer(1); + setupServer(2); + setupServer(3); + setupServer(4); + setupServer(5); + } + + HashMap<Long,QuorumServer> peers = null; + public void setupServer(int i) throws IOException { + int tickTime = 2000; + int initLimit = 3; + int syncLimit = 3; + + if(peers == null){ + peers = new HashMap<Long,QuorumServer>(); + + peers.put(Long.valueOf(1), new QuorumServer(1, + new InetSocketAddress(LOCALADDR, port1), + new InetSocketAddress(LOCALADDR, portLE1), + new InetSocketAddress(LOCALADDR, portClient1), + LearnerType.PARTICIPANT)); + peers.put(Long.valueOf(2), new QuorumServer(2, + new InetSocketAddress(LOCALADDR, port2), + new InetSocketAddress(LOCALADDR, portLE2), + new InetSocketAddress(LOCALADDR, portClient2), + LearnerType.PARTICIPANT)); + peers.put(Long.valueOf(3), new QuorumServer(3, + new InetSocketAddress(LOCALADDR, port3), + new InetSocketAddress(LOCALADDR, portLE3), + new InetSocketAddress(LOCALADDR, portClient3), + LearnerType.PARTICIPANT)); + peers.put(Long.valueOf(4), new QuorumServer(4, + new InetSocketAddress(LOCALADDR, port4), + new InetSocketAddress(LOCALADDR, portLE4), + new InetSocketAddress(LOCALADDR, portClient4), + LearnerType.PARTICIPANT)); + peers.put(Long.valueOf(5), new QuorumServer(5, + new InetSocketAddress(LOCALADDR, port5), + new InetSocketAddress(LOCALADDR, portLE5), + new InetSocketAddress(LOCALADDR, portClient5), + LearnerType.PARTICIPANT)); + } + + switch(i){ + case 1: + LOG.info("creating QuorumPeer 1 port " + portClient1); + s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit); + Assert.assertEquals(portClient1, s1.getClientPort()); + break; + case 2: + LOG.info("creating QuorumPeer 2 port " + portClient2); + s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit); + Assert.assertEquals(portClient2, s2.getClientPort()); + break; + case 3: + LOG.info("creating QuorumPeer 3 port " + portClient3); + s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit); + Assert.assertEquals(portClient3, s3.getClientPort()); + break; + case 4: + LOG.info("creating QuorumPeer 4 port " + portClient4); + s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime, initLimit, syncLimit); + Assert.assertEquals(portClient4, s4.getClientPort()); + break; + case 5: + LOG.info("creating QuorumPeer 5 port " + portClient5); + s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit); + Assert.assertEquals(portClient5, s5.getClientPort()); + } + } + + @Override + public void tearDown() throws Exception { + LOG.info("TearDown started"); + + OSMXBean osMbean = new OSMXBean(); + if (osMbean.getUnix() == true) { + LOG.info("fdcount after test is: " + + osMbean.getOpenFileDescriptorCount()); + } + + shutdownServers(); + + for (String hp : hostPort.split(",")) { + Assert.assertTrue("waiting for server down", + ClientBase.waitForServerDown(hp, + ClientBase.CONNECTION_TIMEOUT)); + LOG.info(hp + " is no longer accepting client connections"); + } + + JMXEnv.tearDown(); + } + public void shutdownServers() { + shutdown(s1); + shutdown(s2); + shutdown(s3); + shutdown(s4); + shutdown(s5); + } + + public static void shutdown(QuorumPeer qp) { + if (qp == null) { + return; + } + try { + LOG.info("Shutting down quorum peer " + qp.getName()); + qp.shutdown(); + Election e = qp.getElectionAlg(); + if (e != null) { + LOG.info("Shutting down leader election " + qp.getName()); + e.shutdown(); + } else { + LOG.info("No election available to shutdown " + qp.getName()); + } + LOG.info("Waiting for " + qp.getName() + " to exit thread"); + long readTimeout = qp.getTickTime() * qp.getInitLimit(); + long connectTimeout = qp.getTickTime() * qp.getSyncLimit(); + long maxTimeout = Math.max(readTimeout, connectTimeout); + maxTimeout = Math.max(maxTimeout, ClientBase.CONNECTION_TIMEOUT); + qp.join(maxTimeout * 2); + if (qp.isAlive()) { + Assert.fail("QP failed to shutdown in " + (maxTimeout * 2) + " seconds: " + qp.getName()); + } + } catch (InterruptedException e) { + LOG.debug("QP interrupted: " + qp.getName(), e); + } + } + + protected TestableZooKeeper createClient() + throws IOException, InterruptedException + { + return createClient(hostPort); + } + + protected TestableZooKeeper createClient(String hp) + throws IOException, InterruptedException + { + CountdownWatcher watcher = new CountdownWatcher(); + return createClient(watcher, hp); + } + + protected TestableZooKeeper createClient(CountdownWatcher watcher, ServerState state) + throws IOException, InterruptedException + { + return createClient(watcher, getPeersMatching(state)); + } +}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumHammerTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumHammerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumHammerTest.java new file mode 100644 index 0000000..e5b377e --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumHammerTest.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; +import org.apache.zookeeper.ZKTestCase; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QuorumHammerTest extends ZKTestCase { + protected static final Logger LOG = LoggerFactory.getLogger(QuorumHammerTest.class); + public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT; + + protected final QuorumBase qb = new QuorumBase(); + protected final ClientHammerTest cht = new ClientHammerTest(); + + @Before + public void setUp() throws Exception { + qb.setUp(); + cht.hostPort = qb.hostPort; + cht.setUpAll(); + } + + @After + public void tearDown() throws Exception { + cht.tearDownAll(); + qb.tearDown(); + } + + @Test + public void testHammerBasic() throws Throwable { + cht.testHammerBasic(); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumMajorityTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumMajorityTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumMajorityTest.java new file mode 100644 index 0000000..6966626 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumMajorityTest.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; +import java.util.ArrayList; + +import org.apache.zookeeper.jmx.CommonNames; +import org.apache.zookeeper.server.quorum.Leader.Proposal; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QuorumMajorityTest extends QuorumBase { + protected static final Logger LOG = LoggerFactory.getLogger(QuorumMajorityTest.class); + public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT; + + /***************************************************************/ + /* Test that the majority quorum verifier only counts votes from */ + /* followers in its view */ + /***************************************************************/ + @Test + public void testMajQuorums() throws Throwable { + LOG.info("Verify QuorumPeer#electionTimeTaken jmx bean attribute"); + + ArrayList<QuorumPeer> peers = getPeerList(); + for (int i = 1; i <= peers.size(); i++) { + QuorumPeer qp = peers.get(i - 1); + Long electionTimeTaken = -1L; + String bean = ""; + if (qp.getPeerState() == ServerState.FOLLOWING) { + bean = String.format( + "%s:name0=ReplicatedServer_id%d,name1=replica.%d,name2=Follower", + CommonNames.DOMAIN, i, i); + } else if (qp.getPeerState() == ServerState.LEADING) { + bean = String.format( + "%s:name0=ReplicatedServer_id%d,name1=replica.%d,name2=Leader", + CommonNames.DOMAIN, i, i); + } + electionTimeTaken = (Long) JMXEnv.ensureBeanAttribute(bean, + "ElectionTimeTaken"); + Assert.assertTrue("Wrong electionTimeTaken value!", + electionTimeTaken >= 0); + } + + //setup servers 1-5 to be followers + setUp(false); + + Proposal p = new Proposal(); + + p.addQuorumVerifier(s1.getQuorumVerifier()); + + // 2 followers out of 5 is not a majority + p.addAck(Long.valueOf(1)); + p.addAck(Long.valueOf(2)); + Assert.assertEquals(false, p.hasAllQuorums()); + + // 6 is not in the view - its vote shouldn't count + p.addAck(Long.valueOf(6)); + Assert.assertEquals(false, p.hasAllQuorums()); + + // 3 followers out of 5 are a majority of the voting view + p.addAck(Long.valueOf(3)); + Assert.assertEquals(true, p.hasAllQuorums()); + + //setup servers 1-3 to be followers and 4 and 5 to be observers + setUp(true); + + p = new Proposal(); + p.addQuorumVerifier(s1.getQuorumVerifier()); + + // 1 follower out of 3 is not a majority + p.addAck(Long.valueOf(1)); + Assert.assertEquals(false, p.hasAllQuorums()); + + // 4 and 5 are observers, their vote shouldn't count + p.addAck(Long.valueOf(4)); + p.addAck(Long.valueOf(5)); + Assert.assertEquals(false, p.hasAllQuorums()); + + // 6 is not in the view - its vote shouldn't count + p.addAck(Long.valueOf(6)); + Assert.assertEquals(false, p.hasAllQuorums()); + + // 2 followers out of 3 are a majority of the voting view + p.addAck(Long.valueOf(2)); + Assert.assertEquals(true, p.hasAllQuorums()); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumQuotaTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumQuotaTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumQuotaTest.java new file mode 100644 index 0000000..415bb7d --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumQuotaTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.Quotas; +import org.apache.zookeeper.StatsTrack; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeperMain; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.Stat; +import org.junit.Assert; +import org.junit.Test; + +public class QuorumQuotaTest extends QuorumBase { + + @Test + public void testQuotaWithQuorum() throws Exception { + ZooKeeper zk = createClient(); + zk.setData("/", "some".getBytes(), -1); + zk.create("/a", "some".getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + int i = 0; + for (i=0; i < 300;i++) { + zk.create("/a/" + i, "some".getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } + ZooKeeperMain.createQuota(zk, "/a", 1000L, 5000); + String statPath = Quotas.quotaZookeeper + "/a"+ "/" + Quotas.statNode; + byte[] data = zk.getData(statPath, false, new Stat()); + StatsTrack st = new StatsTrack(new String(data)); + Assert.assertTrue("bytes are set", st.getBytes() == 1204L); + Assert.assertTrue("num count is set", st.getCount() == 301); + for (i=300; i < 600; i++) { + zk.create("/a/" + i, "some".getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } + data = zk.getData(statPath, false, new Stat()); + st = new StatsTrack(new String(data)); + Assert.assertTrue("bytes are set", st.getBytes() == 2404L); + Assert.assertTrue("num count is set", st.getCount() == 601); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java new file mode 100644 index 0000000..cd1d153 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java @@ -0,0 +1,369 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.quorum.Leader; +import org.apache.zookeeper.server.quorum.LearnerHandler; +import org.apache.zookeeper.test.ClientBase.CountdownWatcher; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QuorumTest extends ZKTestCase { + private static final Logger LOG = LoggerFactory.getLogger(QuorumTest.class); + public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT; + + private final QuorumBase qb = new QuorumBase(); + private final ClientTest ct = new ClientTest(); + private QuorumUtil qu; + + @Before + public void setUp() throws Exception { + qb.setUp(); + ct.hostPort = qb.hostPort; + ct.setUpAll(); + } + + @After + public void tearDown() throws Exception { + ct.tearDownAll(); + qb.tearDown(); + if (qu != null) { + qu.tearDown(); + } + } + + @Test + public void testDeleteWithChildren() throws Exception { + ct.testDeleteWithChildren(); + } + + @Test + public void testPing() throws Exception { + ct.testPing(); + } + + @Test + public void testSequentialNodeNames() + throws IOException, InterruptedException, KeeperException + { + ct.testSequentialNodeNames(); + } + + @Test + public void testACLs() throws Exception { + ct.testACLs(); + } + + @Test + public void testClientwithoutWatcherObj() throws IOException, + InterruptedException, KeeperException + { + ct.testClientwithoutWatcherObj(); + } + + @Test + public void testClientWithWatcherObj() throws IOException, + InterruptedException, KeeperException + { + ct.testClientWithWatcherObj(); + } + + @Test + public void testGetView() { + Assert.assertEquals(5,qb.s1.getView().size()); + Assert.assertEquals(5,qb.s2.getView().size()); + Assert.assertEquals(5,qb.s3.getView().size()); + Assert.assertEquals(5,qb.s4.getView().size()); + Assert.assertEquals(5,qb.s5.getView().size()); + } + + @Test + public void testViewContains() { + // Test view contains self + Assert.assertTrue(qb.s1.viewContains(qb.s1.getId())); + + // Test view contains other servers + Assert.assertTrue(qb.s1.viewContains(qb.s2.getId())); + + // Test view does not contain non-existant servers + Assert.assertFalse(qb.s1.viewContains(-1L)); + } + + volatile int counter = 0; + volatile int errors = 0; + @Test + public void testLeaderShutdown() throws IOException, InterruptedException, KeeperException { + ZooKeeper zk = new DisconnectableZooKeeper(qb.hostPort, ClientBase.CONNECTION_TIMEOUT, new Watcher() { + public void process(WatchedEvent event) { + }}); + zk.create("/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create("/blah/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Leader leader = qb.s1.leader; + if (leader == null) leader = qb.s2.leader; + if (leader == null) leader = qb.s3.leader; + if (leader == null) leader = qb.s4.leader; + if (leader == null) leader = qb.s5.leader; + Assert.assertNotNull(leader); + for(int i = 0; i < 5000; i++) { + zk.setData("/blah/blah", new byte[0], -1, new AsyncCallback.StatCallback() { + public void processResult(int rc, String path, Object ctx, + Stat stat) { + counter++; + if (rc != 0) { + errors++; + } + } + }, null); + } + for(LearnerHandler f : leader.getForwardingFollowers()) { + f.getSocket().shutdownInput(); + } + for(int i = 0; i < 5000; i++) { + zk.setData("/blah/blah", new byte[0], -1, new AsyncCallback.StatCallback() { + public void processResult(int rc, String path, Object ctx, + Stat stat) { + counter++; + if (rc != 0) { + errors++; + } + } + }, null); + } + // check if all the followers are alive + Assert.assertTrue(qb.s1.isAlive()); + Assert.assertTrue(qb.s2.isAlive()); + Assert.assertTrue(qb.s3.isAlive()); + Assert.assertTrue(qb.s4.isAlive()); + Assert.assertTrue(qb.s5.isAlive()); + zk.close(); + } + + @Test + public void testMultipleWatcherObjs() throws IOException, + InterruptedException, KeeperException + { + ct.testMutipleWatcherObjs(); + } + + /** + * Make sure that we can change sessions + * from follower to leader. + * + * @throws IOException + * @throws InterruptedException + * @throws KeeperException + */ + @Test + public void testSessionMoved() throws Exception { + String hostPorts[] = qb.hostPort.split(","); + DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0], + ClientBase.CONNECTION_TIMEOUT, new Watcher() { + public void process(WatchedEvent event) { + }}); + zk.create("/sessionMoveTest", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + // we want to loop through the list twice + for(int i = 0; i < hostPorts.length*2; i++) { + zk.dontReconnect(); + // This should stomp the zk handle + DisconnectableZooKeeper zknew = + new DisconnectableZooKeeper(hostPorts[(i+1)%hostPorts.length], + ClientBase.CONNECTION_TIMEOUT, + new Watcher() {public void process(WatchedEvent event) { + }}, + zk.getSessionId(), + zk.getSessionPasswd()); + zknew.setData("/", new byte[1], -1); + final int result[] = new int[1]; + result[0] = Integer.MAX_VALUE; + zknew.sync("/", new AsyncCallback.VoidCallback() { + public void processResult(int rc, String path, Object ctx) { + synchronized(result) { result[0] = rc; result.notify(); } + } + }, null); + synchronized(result) { + if(result[0] == Integer.MAX_VALUE) { + result.wait(5000); + } + } + LOG.info(hostPorts[(i+1)%hostPorts.length] + " Sync returned " + result[0]); + Assert.assertTrue(result[0] == KeeperException.Code.OK.intValue()); + try { + zk.setData("/", new byte[1], -1); + Assert.fail("Should have lost the connection"); + } catch(KeeperException.ConnectionLossException e) { + } + zk = zknew; + } + zk.close(); + } + + private static class DiscoWatcher implements Watcher { + volatile boolean zkDisco = false; + public void process(WatchedEvent event) { + if (event.getState() == KeeperState.Disconnected) { + zkDisco = true; + } + } + } + + /** + * Connect to two different servers with two different handles using the same session and + * make sure we cannot do any changes. + */ + @Test + @Ignore + public void testSessionMove() throws Exception { + String hps[] = qb.hostPort.split(","); + DiscoWatcher oldWatcher = new DiscoWatcher(); + DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hps[0], + ClientBase.CONNECTION_TIMEOUT, oldWatcher); + zk.create("/t1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + zk.dontReconnect(); + // This should stomp the zk handle + DiscoWatcher watcher = new DiscoWatcher(); + DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(hps[1], + ClientBase.CONNECTION_TIMEOUT, watcher, zk.getSessionId(), + zk.getSessionPasswd()); + zknew.create("/t2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + try { + zk.create("/t3", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + Assert.fail("Should have lost the connection"); + } catch(KeeperException.ConnectionLossException e) { + // wait up to 30 seconds for the disco to be delivered + for (int i = 0; i < 30; i++) { + if (oldWatcher.zkDisco) { + break; + } + Thread.sleep(1000); + } + Assert.assertTrue(oldWatcher.zkDisco); + } + + ArrayList<ZooKeeper> toClose = new ArrayList<ZooKeeper>(); + toClose.add(zknew); + // Let's just make sure it can still move + for(int i = 0; i < 10; i++) { + zknew.dontReconnect(); + zknew = new DisconnectableZooKeeper(hps[1], + ClientBase.CONNECTION_TIMEOUT, new DiscoWatcher(), + zk.getSessionId(), zk.getSessionPasswd()); + toClose.add(zknew); + zknew.create("/t-"+i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + } + for (ZooKeeper z: toClose) { + z.close(); + } + zk.close(); + } + + /** + * See ZOOKEEPER-790 for details + * */ + @Test + public void testFollowersStartAfterLeader() throws Exception { + qu = new QuorumUtil(1); + CountdownWatcher watcher = new CountdownWatcher(); + qu.startQuorum(); + + int index = 1; + while(qu.getPeer(index).peer.leader == null) + index++; + + // break the quorum + qu.shutdown(index); + + // try to reestablish the quorum + qu.start(index); + + // Connect the client after services are restarted (otherwise we would get + // SessionExpiredException as the previous local session was not persisted). + ZooKeeper zk = new ZooKeeper( + "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(), + ClientBase.CONNECTION_TIMEOUT, watcher); + + try{ + watcher.waitForConnected(CONNECTION_TIMEOUT); + } catch(TimeoutException e) { + Assert.fail("client could not connect to reestablished quorum: giving up after 30+ seconds."); + } + + zk.close(); + } + + // skip superhammer and clientcleanup as they are too expensive for quorum + + /** + * Tests if a multiop submitted to a non-leader propagates to the leader properly + * (see ZOOKEEPER-1124). + * + * The test works as follows. It has a client connect to a follower and submit a multiop + * to the follower. It then verifies that the multiop successfully gets committed by the leader. + * + * Without the fix in ZOOKEEPER-1124, this fails with a ConnectionLoss KeeperException. + */ + @Test + public void testMultiToFollower() throws Exception { + qu = new QuorumUtil(1); + CountdownWatcher watcher = new CountdownWatcher(); + qu.startQuorum(); + + int index = 1; + while(qu.getPeer(index).peer.leader == null) + index++; + + ZooKeeper zk = new ZooKeeper( + "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(), + ClientBase.CONNECTION_TIMEOUT, watcher); + watcher.waitForConnected(CONNECTION_TIMEOUT); + + zk.multi(Arrays.asList( + Op.create("/multi0", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT), + Op.create("/multi1", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT), + Op.create("/multi2", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) + )); + zk.getData("/multi0", false, null); + zk.getData("/multi1", false, null); + zk.getData("/multi2", false, null); + + zk.close(); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java new file mode 100644 index 0000000..314171d --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.server.quorum.Election; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.server.util.OSMXBean; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility for quorum testing. Setups 2n+1 peers and allows to start/stop all + * peers, particular peer, n peers etc. + */ +public class QuorumUtil { + + // TODO partitioning of peers and clients + + // TODO refactor QuorumBase to be special case of this + + private static final Logger LOG = LoggerFactory.getLogger(QuorumUtil.class); + + public static class PeerStruct { + public int id; + public QuorumPeer peer; + public File dataDir; + public int clientPort; + } + + private final Map<Long, QuorumServer> peersView = new HashMap<Long, QuorumServer>(); + + private final Map<Integer, PeerStruct> peers = new HashMap<Integer, PeerStruct>(); + + public final int N; + + public final int ALL; + + private String hostPort; + + private int tickTime; + + private int initLimit; + + private int syncLimit; + + private int electionAlg; + + private boolean localSessionEnabled; + + /** + * Initializes 2n+1 quorum peers which will form a ZooKeeper ensemble. + * + * @param n + * number of peers in the ensemble will be 2n+1 + */ + public QuorumUtil(int n, int syncLimit) throws RuntimeException { + try { + ClientBase.setupTestEnv(); + JMXEnv.setUp(); + + N = n; + ALL = 2 * N + 1; + tickTime = 2000; + initLimit = 3; + this.syncLimit = syncLimit; + electionAlg = 3; + hostPort = ""; + + for (int i = 1; i <= ALL; ++i) { + PeerStruct ps = new PeerStruct(); + ps.id = i; + ps.dataDir = ClientBase.createTmpDir(); + ps.clientPort = PortAssignment.unique(); + peers.put(i, ps); + + peersView.put(Long.valueOf(i), new QuorumServer(i, + new InetSocketAddress("127.0.0.1", PortAssignment.unique()), + new InetSocketAddress("127.0.0.1", PortAssignment.unique()), + new InetSocketAddress("127.0.0.1", ps.clientPort), + LearnerType.PARTICIPANT)); + hostPort += "127.0.0.1:" + ps.clientPort + ((i == ALL) ? "" : ","); + } + for (int i = 1; i <= ALL; ++i) { + PeerStruct ps = peers.get(i); + LOG.info("Creating QuorumPeer " + i + "; public port " + ps.clientPort); + ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, + electionAlg, ps.id, tickTime, initLimit, syncLimit); + Assert.assertEquals(ps.clientPort, ps.peer.getClientPort()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public QuorumUtil(int n) throws RuntimeException { + this(n, 3); + } + + public PeerStruct getPeer(int id) { + return peers.get(id); + } + + // This was added to avoid running into the problem of ZOOKEEPER-1539 + public boolean disableJMXTest = false; + + + public void enableLocalSession(boolean localSessionEnabled) { + this.localSessionEnabled = localSessionEnabled; + } + + public void startAll() throws IOException { + shutdownAll(); + for (int i = 1; i <= ALL; ++i) { + start(i); + LOG.info("Started QuorumPeer " + i); + } + + LOG.info("Checking ports " + hostPort); + for (String hp : hostPort.split(",")) { + Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp(hp, + ClientBase.CONNECTION_TIMEOUT)); + LOG.info(hp + " is accepting client connections"); + } + + // This was added to avoid running into the problem of ZOOKEEPER-1539 + if (disableJMXTest) return; + + // interesting to see what's there... + try { + JMXEnv.dump(); + // make sure we have all servers listed + Set<String> ensureNames = new LinkedHashSet<String>(); + for (int i = 1; i <= ALL; ++i) { + ensureNames.add("InMemoryDataTree"); + } + for (int i = 1; i <= ALL; ++i) { + ensureNames + .add("name0=ReplicatedServer_id" + i + ",name1=replica." + i + ",name2="); + } + for (int i = 1; i <= ALL; ++i) { + for (int j = 1; j <= ALL; ++j) { + ensureNames.add("name0=ReplicatedServer_id" + i + ",name1=replica." + j); + } + } + for (int i = 1; i <= ALL; ++i) { + ensureNames.add("name0=ReplicatedServer_id" + i); + } + JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()])); + } catch (IOException e) { + LOG.warn("IOException during JMXEnv operation", e); + } catch (InterruptedException e) { + LOG.warn("InterruptedException during JMXEnv operation", e); + } + } + + /** + * Start first N+1 peers. + */ + public void startQuorum() throws IOException { + shutdownAll(); + for (int i = 1; i <= N + 1; ++i) { + start(i); + } + for (int i = 1; i <= N + 1; ++i) { + Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" + + getPeer(i).clientPort, ClientBase.CONNECTION_TIMEOUT)); + } + } + + public void start(int id) throws IOException { + PeerStruct ps = getPeer(id); + LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort); + ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, + ps.id, tickTime, initLimit, syncLimit); + if (localSessionEnabled) { + ps.peer.enableLocalSessions(true); + } + Assert.assertEquals(ps.clientPort, ps.peer.getClientPort()); + + ps.peer.start(); + } + + public void restart(int id) throws IOException { + start(id); + Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" + + getPeer(id).clientPort, ClientBase.CONNECTION_TIMEOUT)); + } + + public void startThenShutdown(int id) throws IOException { + PeerStruct ps = getPeer(id); + LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort); + ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, + ps.id, tickTime, initLimit, syncLimit); + if (localSessionEnabled) { + ps.peer.enableLocalSessions(true); + } + Assert.assertEquals(ps.clientPort, ps.peer.getClientPort()); + + ps.peer.start(); + Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" + + getPeer(id).clientPort, ClientBase.CONNECTION_TIMEOUT)); + shutdown(id); + } + + public void shutdownAll() { + for (int i = 1; i <= ALL; ++i) { + shutdown(i); + } + for (String hp : hostPort.split(",")) { + Assert.assertTrue("Waiting for server down", ClientBase.waitForServerDown(hp, + ClientBase.CONNECTION_TIMEOUT)); + LOG.info(hp + " is no longer accepting client connections"); + } + } + + public void shutdown(int id) { + QuorumPeer qp = getPeer(id).peer; + try { + LOG.info("Shutting down quorum peer " + qp.getName()); + qp.shutdown(); + Election e = qp.getElectionAlg(); + if (e != null) { + LOG.info("Shutting down leader election " + qp.getName()); + e.shutdown(); + } else { + LOG.info("No election available to shutdown " + qp.getName()); + } + LOG.info("Waiting for " + qp.getName() + " to exit thread"); + qp.join(30000); + if (qp.isAlive()) { + Assert.fail("QP failed to shutdown in 30 seconds: " + qp.getName()); + } + } catch (InterruptedException e) { + LOG.debug("QP interrupted: " + qp.getName(), e); + } + } + + public String getConnString() { + return hostPort; + } + + public String getConnectString(QuorumPeer peer) { + return "127.0.0.1:" + peer.getClientPort(); + } + + public QuorumPeer getLeaderQuorumPeer() { + for (PeerStruct ps: peers.values()) { + if (ps.peer.leader != null) { + return ps.peer; + } + } + throw new RuntimeException("Unable to find a leader peer"); + } + + public List<QuorumPeer> getFollowerQuorumPeers() { + List<QuorumPeer> peerList = new ArrayList<QuorumPeer>(ALL - 1); + + for (PeerStruct ps: peers.values()) { + if (ps.peer.leader == null) { + peerList.add(ps.peer); + } + } + + return Collections.unmodifiableList(peerList); + } + + public void tearDown() throws Exception { + LOG.info("TearDown started"); + + OSMXBean osMbean = new OSMXBean(); + if (osMbean.getUnix() == true) { + LOG.info("fdcount after test is: " + osMbean.getOpenFileDescriptorCount()); + } + + shutdownAll(); + JMXEnv.tearDown(); + } + + public int getLeaderServer() { + int index = 0; + for (int i = 1; i <= ALL; i++) { + if (getPeer(i).peer.leader != null) { + index = i; + break; + } + } + + Assert.assertTrue("Leader server not found.", index > 0); + return index; + } + + public String getConnectionStringForServer(final int index) { + return "127.0.0.1:" + getPeer(index).clientPort; + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtilTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtilTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtilTest.java new file mode 100644 index 0000000..76e6df0 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtilTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or morecontributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.test; + +import java.io.IOException; +import java.util.Set; + +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.jmx.MBeanRegistry; +import org.apache.zookeeper.jmx.ZKMBeanInfo; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is intented to ensure the correct functionality of + * {@link QuorumUtil} helper. + */ +public class QuorumUtilTest extends ZKTestCase { + private static final Logger LOG = LoggerFactory + .getLogger(QuorumUtilTest.class); + + /** + * <p> + * This test ensures that all JXM beans associated to a {@link QuorumPeer} + * are unregistered when shuted down ({@link QuorumUtil#shutdown(int)}). It + * allows a successfull restarting of several zookeeper servers ( + * {@link QuorumPeer}) running on the same JVM. + * <p> + * See ZOOKEEPER-1214 for details. + */ + @Test + public void validateAllMXBeanAreUnregistered() throws IOException { + QuorumUtil qU = new QuorumUtil(1); + LOG.info(">-->> Starting up all servers..."); + qU.startAll(); + LOG.info(">-->> Servers up and running..."); + + int leaderIndex = qU.getLeaderServer(); + int firstFollowerIndex = 0; + int secondFollowerIndex = 0; + + switch (leaderIndex) { + case 1: + firstFollowerIndex = 2; + secondFollowerIndex = 3; + break; + case 2: + firstFollowerIndex = 1; + secondFollowerIndex = 3; + break; + case 3: + firstFollowerIndex = 1; + secondFollowerIndex = 2; + break; + + default: + Assert.fail("Unexpected leaderIndex value: " + leaderIndex); + break; + } + + LOG.info(">-->> Shuting down server [{}]", firstFollowerIndex); + qU.shutdown(firstFollowerIndex); + LOG.info(">-->> Shuting down server [{}]", secondFollowerIndex); + qU.shutdown(secondFollowerIndex); + LOG.info(">-->> Restarting server [{}]", firstFollowerIndex); + qU.restart(firstFollowerIndex); + LOG.info(">-->> Restarting server [{}]", secondFollowerIndex); + qU.restart(secondFollowerIndex); + + qU.shutdownAll(); + Set<ZKMBeanInfo> pending = MBeanRegistry.getInstance() + .getRegisteredBeans(); + Assert.assertTrue("The following beans should have been unregistered: " + + pending, pending.isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumZxidSyncTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumZxidSyncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumZxidSyncTest.java new file mode 100644 index 0000000..6e46edc --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumZxidSyncTest.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.File; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class QuorumZxidSyncTest extends ZKTestCase { + QuorumBase qb = new QuorumBase(); + + @Before + public void setUp() throws Exception { + qb.setUp(); + } + + /** + * find out what happens when a follower connects to leader that is behind + */ + @Test + public void testBehindLeader() throws Exception { + // crank up the epoch numbers + ClientBase.waitForServerUp(qb.hostPort, 10000); + ClientBase.waitForServerUp(qb.hostPort, 10000); + ZooKeeper zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() { + public void process(WatchedEvent event) { + }}); + zk.create("/0", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.close(); + qb.shutdownServers(); + qb.startServers(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + qb.shutdownServers(); + qb.startServers(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() { + public void process(WatchedEvent event) { + }}); + zk.create("/1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.close(); + qb.shutdownServers(); + qb.startServers(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + qb.shutdownServers(); + qb.startServers(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() { + public void process(WatchedEvent event) { + }}); + zk.create("/2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.close(); + qb.shutdownServers(); + deleteFiles(qb.s1dir); + deleteFiles(qb.s2dir); + deleteFiles(qb.s3dir); + deleteFiles(qb.s4dir); + qb.setupServers(); + qb.s1.start(); + qb.s2.start(); + qb.s3.start(); + qb.s4.start(); + Assert.assertTrue("Servers didn't come up", ClientBase.waitForServerUp(qb.hostPort, 10000)); + qb.s5.start(); + String hostPort = "127.0.0.1:" + qb.s5.getClientPort(); + Assert.assertFalse("Servers came up, but shouldn't have since it's ahead of leader", + ClientBase.waitForServerUp(hostPort, 10000)); + } + + private void deleteFiles(File f) { + File v = new File(f, "version-2"); + for(File c: v.listFiles()) { + c.delete(); + } + } + + /** + * find out what happens when the latest state is in the snapshots not + * the logs. + */ + @Test + public void testLateLogs() throws Exception { + // crank up the epoch numbers + ClientBase.waitForServerUp(qb.hostPort, 10000); + ClientBase.waitForServerUp(qb.hostPort, 10000); + ZooKeeper zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() { + public void process(WatchedEvent event) { + }}); + zk.create("/0", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.close(); + qb.shutdownServers(); + qb.startServers(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + qb.shutdownServers(); + qb.startServers(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() { + public void process(WatchedEvent event) { + }}); + zk.create("/1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.close(); + qb.shutdownServers(); + qb.startServers(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + qb.shutdownServers(); + deleteLogs(qb.s1dir); + deleteLogs(qb.s2dir); + deleteLogs(qb.s3dir); + deleteLogs(qb.s4dir); + deleteLogs(qb.s5dir); + qb.startServers(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() { + public void process(WatchedEvent event) { + }}); + zk.create("/2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.close(); + qb.shutdownServers(); + qb.startServers(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() { + public void process(WatchedEvent event) { + }}); + boolean saw2 = false; + for(String child: zk.getChildren("/", false)) { + if (child.equals("2")) { + saw2 = true; + } + } + zk.close(); + Assert.assertTrue("Didn't see /2 (went back in time)", saw2); + } + + private void deleteLogs(File f) { + File v = new File(f, "version-2"); + for(File c: v.listFiles()) { + if (c.getName().startsWith("log")) { + c.delete(); + } + } + } + + @After + public void tearDown() throws Exception { + qb.tearDown(); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java new file mode 100644 index 0000000..68c7182 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.ByteArrayOutputStream; +import java.io.LineNumberReader; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + + +import org.apache.log4j.Layout; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.WriterAppender; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NotReadOnlyException; +import org.apache.zookeeper.Transaction; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.test.ClientBase.CountdownWatcher; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +public class ReadOnlyModeTest extends ZKTestCase { + private static final org.slf4j.Logger LOG = LoggerFactory + .getLogger(ReadOnlyModeTest.class); + private static int CONNECTION_TIMEOUT = QuorumBase.CONNECTION_TIMEOUT; + private QuorumUtil qu = new QuorumUtil(1); + + @Before + public void setUp() throws Exception { + System.setProperty("readonlymode.enabled", "true"); + qu.startQuorum(); + } + + @After + public void tearDown() throws Exception { + System.setProperty("readonlymode.enabled", "false"); + qu.tearDown(); + } + + /** + * Test write operations using multi request. + */ + @Test(timeout = 90000) + public void testMultiTransaction() throws Exception { + CountdownWatcher watcher = new CountdownWatcher(); + ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, + watcher, true); + watcher.waitForConnected(CONNECTION_TIMEOUT); // ensure zk got connected + + final String data = "Data to be read in RO mode"; + final String node1 = "/tnode1"; + final String node2 = "/tnode2"; + zk.create(node1, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + watcher.reset(); + qu.shutdown(2); + watcher.waitForConnected(CONNECTION_TIMEOUT); + Assert.assertEquals("Should be in r-o mode", States.CONNECTEDREADONLY, + zk.getState()); + + // read operation during r/o mode + String remoteData = new String(zk.getData(node1, false, null)); + Assert.assertEquals("Failed to read data in r-o mode", data, remoteData); + + try { + Transaction transaction = zk.transaction(); + transaction.setData(node1, "no way".getBytes(), -1); + transaction.create(node2, data.getBytes(), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + transaction.commit(); + Assert.fail("Write operation using multi-transaction" + + " api has succeeded during RO mode"); + } catch (NotReadOnlyException e) { + // ok + } + + Assert.assertNull("Should have created the znode:" + node2, + zk.exists(node2, false)); + } + + /** + * Basic test of read-only client functionality. Tries to read and write + * during read-only mode, then regains a quorum and tries to write again. + */ + @Test(timeout = 90000) + public void testReadOnlyClient() throws Exception { + CountdownWatcher watcher = new CountdownWatcher(); + ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, + watcher, true); + watcher.waitForConnected(CONNECTION_TIMEOUT); // ensure zk got connected + + final String data = "Data to be read in RO mode"; + final String node = "/tnode"; + zk.create(node, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + watcher.reset(); + qu.shutdown(2); + zk.close(); + + // Re-connect the client (in case we were connected to the shut down + // server and the local session was not persisted). + zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, + watcher, true); + watcher.waitForConnected(CONNECTION_TIMEOUT); + + // read operation during r/o mode + String remoteData = new String(zk.getData(node, false, null)); + Assert.assertEquals(data, remoteData); + + try { + zk.setData(node, "no way".getBytes(), -1); + Assert.fail("Write operation has succeeded during RO mode"); + } catch (NotReadOnlyException e) { + // ok + } + + watcher.reset(); + qu.start(2); + Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp( + "127.0.0.1:" + qu.getPeer(2).clientPort, CONNECTION_TIMEOUT)); + zk.close(); + watcher.reset(); + + // Re-connect the client (in case we were connected to the shut down + // server and the local session was not persisted). + zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, + watcher, true); + watcher.waitForConnected(CONNECTION_TIMEOUT); + zk.setData(node, "We're in the quorum now".getBytes(), -1); + + zk.close(); + } + + /** + * Ensures that upon connection to a read-only server client receives + * ConnectedReadOnly state notification. + */ + @Test(timeout = 90000) + public void testConnectionEvents() throws Exception { + CountdownWatcher watcher = new CountdownWatcher(); + ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, + watcher, true); + boolean success = false; + for (int i = 0; i < 30; i++) { + try { + zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + success=true; + break; + } catch(KeeperException.ConnectionLossException e) { + Thread.sleep(1000); + } + } + Assert.assertTrue("Did not succeed in connecting in 30s", success); + Assert.assertFalse("The connection should not be read-only yet", watcher.readOnlyConnected); + + // kill peer and wait no more than 5 seconds for read-only server + // to be started (which should take one tickTime (2 seconds)) + qu.shutdown(2); + + // Re-connect the client (in case we were connected to the shut down + // server and the local session was not persisted). + zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true); + long start = Time.currentElapsedTime(); + while (!(zk.getState() == States.CONNECTEDREADONLY)) { + Thread.sleep(200); + // FIXME this was originally 5 seconds, but realistically, on random/slow/virt hosts, there is no way to guarantee this + Assert.assertTrue("Can't connect to the server", + Time.currentElapsedTime() - start < 30000); + } + + watcher.waitForReadOnlyConnected(5000); + zk.close(); + } + + /** + * Tests a situation when client firstly connects to a read-only server and + * then connects to a majority server. Transition should be transparent for + * the user. + */ + @Test(timeout = 90000) + public void testSessionEstablishment() throws Exception { + qu.shutdown(2); + + CountdownWatcher watcher = new CountdownWatcher(); + ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, + watcher, true); + watcher.waitForConnected(CONNECTION_TIMEOUT); + Assert.assertSame("should be in r/o mode", States.CONNECTEDREADONLY, zk + .getState()); + long fakeId = zk.getSessionId(); + LOG.info("Connected as r/o mode with state {} and session id {}", + zk.getState(), fakeId); + + watcher.reset(); + qu.start(2); + Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp( + "127.0.0.1:" + qu.getPeer(2).clientPort, CONNECTION_TIMEOUT)); + LOG.info("Server 127.0.0.1:{} is up", qu.getPeer(2).clientPort); + // ZOOKEEPER-2722: wait until we can connect to a read-write server after the quorum + // is formed. Otherwise, it is possible that client first connects to a read-only server, + // then drops the connection because of shutting down of the read-only server caused + // by leader election / quorum forming between the read-only server and the newly started + // server. If we happen to execute the zk.create after the read-only server is shutdown and + // before the quorum is formed, we will get a ConnectLossException. + watcher.waitForSyncConnected(CONNECTION_TIMEOUT); + Assert.assertEquals("Should be in read-write mode", States.CONNECTED, + zk.getState()); + LOG.info("Connected as rw mode with state {} and session id {}", + zk.getState(), zk.getSessionId()); + zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + Assert.assertFalse("fake session and real session have same id", zk + .getSessionId() == fakeId); + zk.close(); + } + + /** + * Ensures that client seeks for r/w servers while it's connected to r/o + * server. + */ + @SuppressWarnings("deprecation") + @Test(timeout = 90000) + public void testSeekForRwServer() throws Exception { + // setup the logger to capture all logs + Layout layout = Logger.getRootLogger().getAppender("CONSOLE") + .getLayout(); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + WriterAppender appender = new WriterAppender(layout, os); + appender.setImmediateFlush(true); + appender.setThreshold(Level.INFO); + Logger zlogger = Logger.getLogger("org.apache.zookeeper"); + zlogger.addAppender(appender); + + try { + qu.shutdown(2); + CountdownWatcher watcher = new CountdownWatcher(); + ZooKeeper zk = new ZooKeeper(qu.getConnString(), + CONNECTION_TIMEOUT, watcher, true); + watcher.waitForConnected(CONNECTION_TIMEOUT); + + // if we don't suspend a peer it will rejoin a quorum + qu.getPeer(1).peer.suspend(); + + // start two servers to form a quorum; client should detect this and + // connect to one of them + watcher.reset(); + qu.start(2); + qu.start(3); + ClientBase.waitForServerUp(qu.getConnString(), 2000); + watcher.waitForConnected(CONNECTION_TIMEOUT); + zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + // resume poor fellow + qu.getPeer(1).peer.resume(); + } finally { + zlogger.removeAppender(appender); + } + + os.close(); + LineNumberReader r = new LineNumberReader(new StringReader(os + .toString())); + String line; + Pattern p = Pattern.compile(".*Majority server found.*"); + boolean found = false; + while ((line = r.readLine()) != null) { + if (p.matcher(line).matches()) { + found = true; + break; + } + } + Assert.assertTrue( + "Majority server wasn't found while connected to r/o server", + found); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigExceptionTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigExceptionTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigExceptionTest.java new file mode 100644 index 0000000..5eda4b0 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigExceptionTest.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.admin.ZooKeeperAdmin; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReconfigExceptionTest extends ZKTestCase { + private static final Logger LOG = LoggerFactory + .getLogger(ReconfigExceptionTest.class); + private static String authProvider = "zookeeper.DigestAuthenticationProvider.superDigest"; + // Use DigestAuthenticationProvider.base64Encode or + // run ZooKeeper jar with org.apache.zookeeper.server.auth.DigestAuthenticationProvider to generate password. + // An example: + // java -cp zookeeper-3.6.0-SNAPSHOT.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.5.jar: + // lib/slf4j-api-1.7.5.jar org.apache.zookeeper.server.auth.DigestAuthenticationProvider super:test + // The password here is 'test'. + private static String superDigest = "super:D/InIHSb7yEEbrWz8b9l71RjZJU="; + private QuorumUtil qu; + private ZooKeeperAdmin zkAdmin; + + @Before + public void setup() throws InterruptedException { + System.setProperty(authProvider, superDigest); + QuorumPeerConfig.setReconfigEnabled(true); + + // Get a three server quorum. + qu = new QuorumUtil(1); + qu.disableJMXTest = true; + + try { + qu.startAll(); + } catch (IOException e) { + Assert.fail("Fail to start quorum servers."); + } + + resetZKAdmin(); + } + + @After + public void tearDown() throws Exception { + System.clearProperty(authProvider); + try { + if (qu != null) { + qu.tearDown(); + } + if (zkAdmin != null) { + zkAdmin.close(); + } + } catch (Exception e) { + // Ignore. + } + } + + @Test(timeout = 10000) + public void testReconfigDisabled() throws InterruptedException { + QuorumPeerConfig.setReconfigEnabled(false); + try { + reconfigPort(); + Assert.fail("Reconfig should be disabled."); + } catch (KeeperException e) { + Assert.assertTrue(e.code() == KeeperException.Code.RECONFIGDISABLED); + } + } + + @Test(timeout = 10000) + public void testReconfigFailWithoutAuth() throws InterruptedException { + try { + reconfigPort(); + Assert.fail("Reconfig should fail without auth."); + } catch (KeeperException e) { + // However a failure is still expected as user is not authenticated, so ACL check will fail. + Assert.assertTrue(e.code() == KeeperException.Code.NOAUTH); + } + } + + @Test(timeout = 10000) + public void testReconfigEnabledWithSuperUser() throws InterruptedException { + try { + zkAdmin.addAuthInfo("digest", "super:test".getBytes()); + Assert.assertTrue(reconfigPort()); + } catch (KeeperException e) { + Assert.fail("Reconfig should not fail, but failed with exception : " + e.getMessage()); + } + } + + @Test(timeout = 10000) + public void testReconfigFailWithAuthWithNoACL() throws InterruptedException { + resetZKAdmin(); + + try { + zkAdmin.addAuthInfo("digest", "user:test".getBytes()); + reconfigPort(); + Assert.fail("Reconfig should fail without a valid ACL associated with user."); + } catch (KeeperException e) { + // Again failure is expected because no ACL is associated with this user. + Assert.assertTrue(e.code() == KeeperException.Code.NOAUTH); + } + } + + @Test(timeout = 10000) + public void testReconfigEnabledWithAuthAndWrongACL() throws InterruptedException { + resetZKAdmin(); + + try { + zkAdmin.addAuthInfo("digest", "super:test".getBytes()); + // There is ACL however the permission is wrong - need WRITE permission at leaste. + ArrayList<ACL> acls = new ArrayList<ACL>( + Collections.singletonList( + new ACL(ZooDefs.Perms.READ, + new Id("digest", "user:tl+z3z0vO6PfPfEENfLF96E6pM0="/* password is test */)))); + zkAdmin.setACL(ZooDefs.CONFIG_NODE, acls, -1); + resetZKAdmin(); + zkAdmin.addAuthInfo("digest", "user:test".getBytes()); + reconfigPort(); + Assert.fail("Reconfig should fail with an ACL that is read only!"); + } catch (KeeperException e) { + Assert.assertTrue(e.code() == KeeperException.Code.NOAUTH); + } + } + + @Test(timeout = 10000) + public void testReconfigEnabledWithAuthAndACL() throws InterruptedException { + resetZKAdmin(); + + try { + zkAdmin.addAuthInfo("digest", "super:test".getBytes()); + ArrayList<ACL> acls = new ArrayList<ACL>( + Collections.singletonList( + new ACL(ZooDefs.Perms.WRITE, + new Id("digest", "user:tl+z3z0vO6PfPfEENfLF96E6pM0="/* password is test */)))); + zkAdmin.setACL(ZooDefs.CONFIG_NODE, acls, -1); + resetZKAdmin(); + zkAdmin.addAuthInfo("digest", "user:test".getBytes()); + Assert.assertTrue(reconfigPort()); + } catch (KeeperException e) { + Assert.fail("Reconfig should not fail, but failed with exception : " + e.getMessage()); + } + } + + // Utility method that recreates a new ZooKeeperAdmin handle, and wait for the handle to connect to + // quorum servers. + private void resetZKAdmin() throws InterruptedException { + String cnxString; + ClientBase.CountdownWatcher watcher = new ClientBase.CountdownWatcher(); + try { + cnxString = "127.0.0.1:" + qu.getPeer(1).peer.getClientPort(); + if (zkAdmin != null) { + zkAdmin.close(); + } + zkAdmin = new ZooKeeperAdmin(cnxString, + ClientBase.CONNECTION_TIMEOUT, watcher); + } catch (IOException e) { + Assert.fail("Fail to create ZooKeeperAdmin handle."); + return; + } + + try { + watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + } catch (InterruptedException | TimeoutException e) { + Assert.fail("ZooKeeper admin client can not connect to " + cnxString); + } + } + + private boolean reconfigPort() throws KeeperException, InterruptedException { + List<String> joiningServers = new ArrayList<String>(); + int leaderId = 1; + while (qu.getPeer(leaderId).peer.leader == null) + leaderId++; + int followerId = leaderId == 1 ? 2 : 1; + joiningServers.add("server." + followerId + "=localhost:" + + qu.getPeer(followerId).peer.getQuorumAddress().getPort() /*quorum port*/ + + ":" + qu.getPeer(followerId).peer.getElectionAddress().getPort() /*election port*/ + + ":participant;localhost:" + PortAssignment.unique()/* new client port */); + zkAdmin.reconfigure(joiningServers, null, null, -1, new Stat()); + return true; + } +}