Author: mahadev Date: Thu Jul 29 21:05:51 2010 New Revision: 980572 URL: http://svn.apache.org/viewvc?rev=980572&view=rev Log: ZOOKEEPER-790. Last processed zxid set prematurely while establishing leadership (flavio via mahadev)
Added: hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumUtil.java Modified: hadoop/zookeeper/branches/branch-3.3/CHANGES.txt hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java Modified: hadoop/zookeeper/branches/branch-3.3/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/CHANGES.txt?rev=980572&r1=980571&r2=980572&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/CHANGES.txt (original) +++ hadoop/zookeeper/branches/branch-3.3/CHANGES.txt Thu Jul 29 21:05:51 2010 @@ -6,6 +6,9 @@ BUGFIXES: ZOOKEEPER-783. committedLog in ZKDatabase is not properly synchronized (henry via mahadev) + ZOOKEEPER-790. Last processed zxid set prematurely while establishing + leadership (flavio via mahadev) + IMPROVEMENTS: ZOOKEEPER-789. Improve FLE log messages (flavio via phunt) Modified: hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=980572&r1=980571&r2=980572&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original) +++ hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Thu Jul 29 21:05:51 2010 @@ -157,6 +157,7 @@ public class NIOServerCnxn implements Wa public void startup(ZooKeeperServer zks) throws IOException, InterruptedException { start(); + zks.startdata(); zks.startup(); setZooKeeperServer(zks); } Modified: hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=980572&r1=980571&r2=980572&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original) +++ hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Thu Jul 29 21:05:51 2010 @@ -357,14 +357,18 @@ public class ZooKeeperServer implements } } - public void startup() throws IOException, InterruptedException { + public void startdata() + throws IOException, InterruptedException { //check to see if zkDb is not null if (zkDb == null) { zkDb = new ZKDatabase(this.txnLogFactory); - } + } if (!zkDb.isInitialized()) { loadData(); } + } + + public void startup() { createSessionTracker(); setupRequestProcessors(); Modified: hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=980572&r1=980571&r2=980572&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original) +++ hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Thu Jul 29 21:05:51 2010 @@ -327,12 +327,6 @@ public class Leader { self.tick++; } - if(LOG.isInfoEnabled()){ - LOG.info("Have quorum of supporters; starting up and setting last processed zxid: " + zk.getZxid()); - } - zk.startup(); - zk.getZKDatabase().setlastProcessedZxid(zk.getZxid()); - if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) { self.cnxnFactory.setZooKeeperServer(zk); } @@ -499,6 +493,11 @@ public class Leader { return; } else { lastCommitted = zxid; + if(LOG.isInfoEnabled()){ + LOG.info("Have quorum of supporters; starting up and setting last processed zxid: " + zk.getZxid()); + } + zk.startup(); + zk.getZKDatabase().setlastProcessedZxid(zk.getZxid()); } } } Modified: hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=980572&r1=980571&r2=980572&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original) +++ hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Thu Jul 29 21:05:51 2010 @@ -352,7 +352,29 @@ public class LearnerHandler extends Thre } } }.start(); - + + /* + * Have to wait for the first ACK, wait until + * the leader is ready, and only then we can + * start processing messages. + */ + qp = new QuorumPacket(); + ia.readRecord(qp, "packet"); + if(qp.getType() != Leader.ACK){ + LOG.error("Next packet was supposed to be an ACK"); + return; + } + leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); + + /* + * Wait until leader starts up + */ + synchronized(leader.zk){ + while(!leader.zk.isRunning()){ + leader.zk.wait(500); + } + } + while (true) { qp = new QuorumPacket(); ia.readRecord(qp, "packet"); @@ -475,6 +497,7 @@ public class LearnerHandler extends Thre } catch (IOException e) { LOG.warn("Ignoring unexpected exception during socket close", e); } + this.interrupt(); leader.removeLearnerHandler(this); } Modified: hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=980572&r1=980571&r2=980572&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original) +++ hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java Thu Jul 29 21:05:51 2010 @@ -263,7 +263,41 @@ public class QuorumTest extends QuorumBa } zk.close(); } - + /** + * See ZOOKEEPER-790 for details + * */ + @Test + public void testFollowersStartAfterLeader() throws Exception { + QuorumUtil qu = new QuorumUtil(1); + CountdownWatcher watcher = new CountdownWatcher(); + qu.startQuorum(); + + int index = 1; + while(qu.getPeer(index).peer.leader == null) + index++; + + ZooKeeper zk = new ZooKeeper( + "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(), + ClientBase.CONNECTION_TIMEOUT, watcher); + watcher.waitForConnected(CONNECTION_TIMEOUT); + + // break the quorum + qu.shutdown(index); + + // try to reestablish the quorum + qu.start(index); + assertTrue("quorum reestablishment failed", + QuorumBase.waitForServerUp( + "127.0.0.1:" + qu.getPeer(2).clientPort, + CONNECTION_TIMEOUT)); + Thread.sleep(1000); + + // zk should have reconnected already + zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + zk.close(); + } + /** * Tests if closeSession can be logged before a leader gets established, which * could lead to a locked-out follower (see ZOOKEEPER-790). @@ -289,20 +323,24 @@ public class QuorumTest extends QuorumBa throws IOException, InterruptedException, KeeperException{ final Semaphore sem = new Semaphore(0); - Leader leader = qb.s1.leader; - if (leader == null) leader = qb.s2.leader; - if (leader == null) leader = qb.s3.leader; - if (leader == null) leader = qb.s4.leader; - if (leader == null) leader = qb.s5.leader; + QuorumUtil qu = new QuorumUtil(2); + qu.startQuorum(); + - assertNotNull(leader); + int index = 1; + while(qu.getPeer(index).peer.leader == null) + index++; - int serverPort = qb.s1.getClientPort(); - if(qb.s1.leader != null){ - serverPort = qb.s2.getClientPort(); - } + Leader leader = qu.getPeer(index).peer.leader; - ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + serverPort, 1000, new Watcher() { + assertNotNull(leader); + + /* + * Reusing the index variable to select a follower to connect to + */ + index = (index == 1) ? 2 : 1; + + ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, new Watcher() { public void process(WatchedEvent event) { }}); @@ -323,13 +361,12 @@ public class QuorumTest extends QuorumBa }, null); if(i == 5000){ - qb.shutdown(qb.s1); + qu.shutdown(index); LOG.info("Shutting down s1"); } if(i == 12000){ - qb.setupServer(1); - qb.s1.start(); - LOG.info("Setting up s1"); + qu.start(index); + LOG.info("Setting up server: " + index); } if((i % 1000) == 0){ Thread.sleep(500); @@ -340,12 +377,14 @@ public class QuorumTest extends QuorumBa sem.tryAcquire(15000, TimeUnit.MILLISECONDS); // Verify that server is following and has the same epoch as the leader - assertTrue("Not following", qb.s1.follower != null); - long epochF = (qb.s1.getActiveServer().getZxid() >> 32L); + assertTrue("Not following", qu.getPeer(index).peer.follower != null); + long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L); long epochL = (leader.getEpoch() >> 32L); - assertTrue("Zxid: " + qb.s1.getActiveServer().getZxid() + + assertTrue("Zxid: " + qu.getPeer(index).peer.getActiveServer().getZxid() + "Current epoch: " + epochF, epochF == epochL); } - // skip superhammer and clientcleanup as they are too expensive for quorum + + + // skip superhammer and clientcleanup as they are too expensive for quorum } Added: hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumUtil.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumUtil.java?rev=980572&view=auto ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumUtil.java (added) +++ hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumUtil.java Thu Jul 29 21:05:51 2010 @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.File; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.OperatingSystemMXBean; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.log4j.Logger; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.server.quorum.Election; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.junit.Assert; + +import com.sun.management.UnixOperatingSystemMXBean; + +/** + * Utility for quorum testing. Setups 2n+1 peers and allows to start/stop all + * peers, particular peer, n peers etc. + */ +public class QuorumUtil { + + // TODO partitioning of peers and clients + + // TODO refactor QuorumBase to be special case of this + + private static final Logger LOG = Logger.getLogger(QuorumUtil.class); + + public class PeerStruct { + public int id; + public QuorumPeer peer; + public File dataDir; + public int clientPort; + } + + private final Map<Long, QuorumServer> peersView = new HashMap<Long, QuorumServer>(); + + private final Map<Integer, PeerStruct> peers = new HashMap<Integer, PeerStruct>(); + + private final int N; + + private final int ALL; + + private String hostPort; + + private int tickTime; + + private int initLimit; + + private int syncLimit; + + private int electionAlg; + + /** + * Initializes 2n+1 quorum peers which will form a ZooKeeper ensemble. + * + * @param n + * number of peers in the ensemble will be 2n+1 + */ + public QuorumUtil(int n) throws RuntimeException { + try { + ClientBase.setupTestEnv(); + JMXEnv.setUp(); + + N = n; + ALL = 2 * N + 1; + tickTime = 2000; + initLimit = 3; + syncLimit = 3; + electionAlg = 3; + hostPort = ""; + + for (int i = 1; i <= ALL; ++i) { + PeerStruct ps = new PeerStruct(); + ps.id = i; + ps.dataDir = ClientBase.createTmpDir(); + ps.clientPort = PortAssignment.unique(); + peers.put(i, ps); + + peersView.put(Long.valueOf(i), new QuorumServer(i, new InetSocketAddress( + "127.0.0.1", ps.clientPort + 1000), new InetSocketAddress("127.0.0.1", + PortAssignment.unique() + 1000), LearnerType.PARTICIPANT)); + hostPort += "127.0.0.1:" + ps.clientPort + ((i == ALL) ? "" : ","); + } + for (int i = 1; i <= ALL; ++i) { + PeerStruct ps = peers.get(i); + LOG.info("Creating QuorumPeer " + i + "; public port " + ps.clientPort); + ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, + electionAlg, ps.id, tickTime, initLimit, syncLimit); + Assert.assertEquals(ps.clientPort, ps.peer.getClientPort()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public PeerStruct getPeer(int id) { + return peers.get(id); + } + + public void startAll() throws IOException { + for (int i = 1; i <= ALL; ++i) { + start(i); + LOG.info("Started QuorumPeer " + i); + } + + LOG.info("Checking ports " + hostPort); + for (String hp : hostPort.split(",")) { + Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp(hp, + ClientBase.CONNECTION_TIMEOUT)); + LOG.info(hp + " is accepting client connections"); + } + + // interesting to see what's there... + try { + JMXEnv.dump(); + // make sure we have all servers listed + Set<String> ensureNames = new LinkedHashSet<String>(); + for (int i = 1; i <= ALL; ++i) { + ensureNames.add("InMemoryDataTree"); + } + for (int i = 1; i <= ALL; ++i) { + ensureNames + .add("name0=ReplicatedServer_id" + i + ",name1=replica." + i + ",name2="); + } + for (int i = 1; i <= ALL; ++i) { + for (int j = 1; j <= ALL; ++j) { + ensureNames.add("name0=ReplicatedServer_id" + i + ",name1=replica." + j); + } + } + for (int i = 1; i <= ALL; ++i) { + ensureNames.add("name0=ReplicatedServer_id" + i); + } + JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()])); + } catch (IOException e) { + LOG.warn("IOException during JMXEnv operation", e); + } + } + + /** + * Start first N+1 peers. + */ + public void startQuorum() throws IOException { + shutdownAll(); + for (int i = 1; i <= N + 1; ++i) { + start(i); + } + for (int i = 1; i <= N + 1; ++i) { + Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" + + getPeer(i).clientPort, ClientBase.CONNECTION_TIMEOUT)); + } + } + + public void start(int id) throws IOException { + PeerStruct ps = getPeer(id); + LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort); + ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, + ps.id, tickTime, initLimit, syncLimit); + Assert.assertEquals(ps.clientPort, ps.peer.getClientPort()); + + ps.peer.start(); + } + + public void shutdownAll() { + for (int i = 1; i <= ALL; ++i) { + shutdown(i); + } + for (String hp : hostPort.split(",")) { + Assert.assertTrue("Waiting for server down", ClientBase.waitForServerDown(hp, + ClientBase.CONNECTION_TIMEOUT)); + LOG.info(hp + " is no longer accepting client connections"); + } + } + + public void shutdown(int id) { + QuorumPeer qp = getPeer(id).peer; + try { + LOG.info("Shutting down quorum peer " + qp.getName()); + qp.shutdown(); + Election e = qp.getElectionAlg(); + if (e != null) { + LOG.info("Shutting down leader election " + qp.getName()); + e.shutdown(); + } else { + LOG.info("No election available to shutdown " + qp.getName()); + } + LOG.info("Waiting for " + qp.getName() + " to exit thread"); + qp.join(30000); + if (qp.isAlive()) { + Assert.fail("QP failed to shutdown in 30 seconds: " + qp.getName()); + } + } catch (InterruptedException e) { + LOG.debug("QP interrupted: " + qp.getName(), e); + } + } + + public String getConnString() { + return hostPort; + } + + public void tearDown() throws Exception { + LOG.info("TearDown started"); + + OperatingSystemMXBean osMbean = ManagementFactory.getOperatingSystemMXBean(); + if (osMbean != null && osMbean instanceof UnixOperatingSystemMXBean) { + UnixOperatingSystemMXBean unixos = (UnixOperatingSystemMXBean) osMbean; + LOG.info("fdcount after test is: " + unixos.getOpenFileDescriptorCount()); + } + + shutdownAll(); + JMXEnv.tearDown(); + } +}