Author: phunt Date: Wed Oct 6 17:02:16 2010 New Revision: 1005124 URL: http://svn.apache.org/viewvc?rev=1005124&view=rev Log: ZOOKEEPER-822. Leader election taking a long time to complete
Modified: hadoop/zookeeper/branches/branch-3.3/CHANGES.txt hadoop/zookeeper/branches/branch-3.3/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Modified: hadoop/zookeeper/branches/branch-3.3/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/CHANGES.txt?rev=1005124&r1=1005123&r2=1005124&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/CHANGES.txt (original) +++ hadoop/zookeeper/branches/branch-3.3/CHANGES.txt Wed Oct 6 17:02:16 2010 @@ -32,6 +32,9 @@ BUGFIXES: ZOOKEEPER-844. handle auth failure in java client (Camille Fournier via phunt) + ZOOKEEPER-822. Leader election taking a long time to complete + (Vishal K via phunt) + IMPROVEMENTS: ZOOKEEPER-789. Improve FLE log messages (flavio via phunt) Modified: hadoop/zookeeper/branches/branch-3.3/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml?rev=1005124&r1=1005123&r2=1005124&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml (original) +++ hadoop/zookeeper/branches/branch-3.3/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml Wed Oct 6 17:02:16 2010 @@ -919,6 +919,23 @@ server.3=zoo3:2888:3888</programlisting> </para> </listitem> </varlistentry> + + <varlistentry> + <term>cnxTimeout</term> + + <listitem> + <para>(Java system property: zookeeper.<emphasis + role="bold">cnxTimeout</emphasis>)</para> + + <para>Sets the timeout value for opening connections for leader election notifications. + Only applicable if you are using electionAlg 3. + </para> + + <note> + <para>Default value is 5 seconds.</para> + </note> + </listitem> + </varlistentry> </variablelist> <para></para> </section> Modified: hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1005124&r1=1005123&r2=1005124&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original) +++ hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Wed Oct 6 17:02:16 2010 @@ -35,7 +35,7 @@ import org.apache.log4j.Logger; /** * This class implements a connection manager for leader election using TCP. It - * maintains one coonection for every pair of servers. The tricky part is to + * maintains one connection for every pair of servers. The tricky part is to * guarantee that there is exactly one connection for every pair of servers that * are operating correctly and that can communicate over the network. * @@ -75,6 +75,12 @@ public class QuorumCnxManager { private long observerCounter = -1; /* + * Connection time out value in milliseconds + */ + + private int cnxTO = 5000; + + /* * Local IP address */ final QuorumPeer self; @@ -118,6 +124,11 @@ public class QuorumCnxManager { this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>(); this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>(); + String cnxToValue = System.getProperty("zookeeper.cnxTimeout"); + if(cnxToValue != null){ + this.cnxTO = new Integer(cnxToValue); + } + this.self = self; // Starts listener thread that waits for connection requests @@ -131,9 +142,12 @@ public class QuorumCnxManager { */ public void testInitiateConnection(long sid) throws Exception { SocketChannel channel; - LOG.debug("Opening channel to server " + sid); - channel = SocketChannel - .open(self.getVotingView().get(sid).electionAddr); + if(LOG.isDebugEnabled()){ + LOG.debug("Opening channel to server " + sid); + } + + channel = SocketChannel.open(); + channel.socket().connect(self.getVotingView().get(sid).electionAddr, cnxTO); channel.socket().setTcpNoDelay(true); initiateConnection(channel, sid); } @@ -173,11 +187,11 @@ public class QuorumCnxManager { sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); - senderWorkerMap.put(sid, sw); if(vsw != null) vsw.finish(); - + + senderWorkerMap.put(sid, sw); if (!queueSendMap.containsKey(sid)) { queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>( CAPACITY)); @@ -258,11 +272,12 @@ public class QuorumCnxManager { sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); - senderWorkerMap.put(sid, sw); if(vsw != null) vsw.finish(); - + + senderWorkerMap.put(sid, sw); + if (!queueSendMap.containsKey(sid)) { queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>( CAPACITY)); @@ -343,9 +358,12 @@ public class QuorumCnxManager { } try { SocketChannel channel; - LOG.debug("Opening channel to server " + sid); - channel = SocketChannel - .open(self.getView().get(sid).electionAddr); + if(LOG.isDebugEnabled()){ + LOG.debug("Opening channel to server " + sid); + } + + channel = SocketChannel.open(); + channel.socket().connect(self.getView().get(sid).electionAddr, cnxTO); channel.socket().setTcpNoDelay(true); initiateConnection(channel, sid); } catch (UnresolvedAddressException e) { @@ -518,10 +536,19 @@ public class QuorumCnxManager { } synchronized boolean finish() { + if(LOG.isDebugEnabled()){ + LOG.debug("Calling finish"); + } + + if(!running){ + /* + * Avoids running finish() twice. + */ + return running; + } + running = false; - - LOG.debug("Calling finish"); - this.interrupt(); + try{ channel.close(); } catch (IOException e) { @@ -532,6 +559,10 @@ public class QuorumCnxManager { this.interrupt(); if (recvWorker != null) recvWorker.finish(); + + if(LOG.isDebugEnabled()){ + LOG.debug("Removing entry from senderWorkerMap sid=" + sid); + } senderWorkerMap.remove(sid); return running; } @@ -583,7 +614,8 @@ public class QuorumCnxManager { } } } catch (Exception e) { - LOG.warn("Exception when using channel: " + sid, e); + LOG.warn("Exception when using channel: for id " + sid + " my id = " + + self.getId() + " error = " + e); } this.finish(); LOG.warn("Send worker leaving thread"); @@ -610,7 +642,14 @@ public class QuorumCnxManager { * @return boolean Value of variable running */ synchronized boolean finish() { - running = false; + if(!running){ + /* + * Avoids running finish() twice. + */ + return running; + } + running = false; + this.interrupt(); return running; } @@ -655,7 +694,8 @@ public class QuorumCnxManager { } } catch (Exception e) { - LOG.warn("Connection broken: ", e); + LOG.warn("Connection broken for id " + sid + ", my id = " + + self.getId() + ", error = " + e); } finally { try{ channel.socket().close(); Modified: hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=1005124&r1=1005123&r2=1005124&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java (original) +++ hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Wed Oct 6 17:02:16 2010 @@ -22,6 +22,7 @@ import java.io.File; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.Random; import java.util.concurrent.TimeUnit; import junit.framework.TestCase; @@ -178,6 +179,38 @@ public class CnxManagerTest extends Test } } + @Test + public void testCnxManagerTimeout() throws Exception { + Random rand = new Random(); + byte b = (byte) rand.nextInt(); + int deadPort = PortAssignment.unique(); + String deadAddress = new String("10.1.1." + b); + + LOG.info("This is the dead address I'm trying: " + deadAddress); + + peers.put(Long.valueOf(2), + new QuorumServer(2, + new InetSocketAddress(deadAddress, deadPort), + new InetSocketAddress(deadAddress, PortAssignment.unique()))); + tmpdir[2] = ClientBase.createTmpDir(); + port[2] = deadPort; + QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2, 2, 2); + QuorumCnxManager cnxManager = new QuorumCnxManager(peer); + QuorumCnxManager.Listener listener = cnxManager.listener; + if(listener != null){ + listener.start(); + } else { + LOG.error("Null listener when initializing cnx manager"); + } + + long begin = System.currentTimeMillis(); + cnxManager.toSend(new Long(2), createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1)); + long end = System.currentTimeMillis(); + + if((end - begin) > 6000) fail("Waited more than necessary"); + + } + } \ No newline at end of file