Author: phunt
Date: Wed Oct  6 17:02:16 2010
New Revision: 1005124

URL: http://svn.apache.org/viewvc?rev=1005124&view=rev
Log:
ZOOKEEPER-822. Leader election taking a long time to complete

Modified:
    hadoop/zookeeper/branches/branch-3.3/CHANGES.txt
    
hadoop/zookeeper/branches/branch-3.3/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
    
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
    
hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java

Modified: hadoop/zookeeper/branches/branch-3.3/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/CHANGES.txt?rev=1005124&r1=1005123&r2=1005124&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.3/CHANGES.txt (original)
+++ hadoop/zookeeper/branches/branch-3.3/CHANGES.txt Wed Oct  6 17:02:16 2010
@@ -32,6 +32,9 @@ BUGFIXES:
   ZOOKEEPER-844. handle auth failure in java client
   (Camille Fournier via phunt)
 
+  ZOOKEEPER-822. Leader election taking a long time to complete
+  (Vishal K via phunt)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-789. Improve FLE log messages (flavio via phunt)

Modified: 
hadoop/zookeeper/branches/branch-3.3/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml?rev=1005124&r1=1005123&r2=1005124&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.3/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.3/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
 Wed Oct  6 17:02:16 2010
@@ -919,6 +919,23 @@ server.3=zoo3:2888:3888</programlisting>
               </para>
             </listitem>
           </varlistentry>
+          
+          <varlistentry>
+            <term>cnxTimeout</term>
+
+            <listitem>
+              <para>(Java system property: zookeeper.<emphasis
+              role="bold">cnxTimeout</emphasis>)</para>
+
+              <para>Sets the timeout value for opening connections for leader 
election notifications. 
+              Only applicable if you are using electionAlg 3.
+              </para>
+
+              <note>
+                <para>Default value is 5 seconds.</para>
+              </note>
+            </listitem>
+          </varlistentry>
         </variablelist>
         <para></para>
       </section>

Modified: 
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1005124&r1=1005123&r2=1005124&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
 Wed Oct  6 17:02:16 2010
@@ -35,7 +35,7 @@ import org.apache.log4j.Logger;
 
 /**
  * This class implements a connection manager for leader election using TCP. It
- * maintains one coonection for every pair of servers. The tricky part is to
+ * maintains one connection for every pair of servers. The tricky part is to
  * guarantee that there is exactly one connection for every pair of servers 
that
  * are operating correctly and that can communicate over the network.
  * 
@@ -75,6 +75,12 @@ public class QuorumCnxManager {
     private long observerCounter = -1;
     
     /*
+     * Connection time out value in milliseconds 
+     */
+    
+    private int cnxTO = 5000;
+    
+    /*
      * Local IP address
      */
     final QuorumPeer self;
@@ -118,6 +124,11 @@ public class QuorumCnxManager {
         this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
         this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
         
+        String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
+        if(cnxToValue != null){
+            this.cnxTO = new Integer(cnxToValue); 
+        }
+        
         this.self = self;
 
         // Starts listener thread that waits for connection requests 
@@ -131,9 +142,12 @@ public class QuorumCnxManager {
      */
     public void testInitiateConnection(long sid) throws Exception {
         SocketChannel channel;
-        LOG.debug("Opening channel to server "  + sid);
-        channel = SocketChannel
-                .open(self.getVotingView().get(sid).electionAddr);
+        if(LOG.isDebugEnabled()){
+            LOG.debug("Opening channel to server "  + sid);
+        }
+        
+        channel = SocketChannel.open();
+        channel.socket().connect(self.getVotingView().get(sid).electionAddr, 
cnxTO);
         channel.socket().setTcpNoDelay(true);
         initiateConnection(channel, sid);
     }
@@ -173,11 +187,11 @@ public class QuorumCnxManager {
             sw.setRecv(rw);
 
             SendWorker vsw = senderWorkerMap.get(sid);
-            senderWorkerMap.put(sid, sw);
             
             if(vsw != null)
                 vsw.finish();
-
+            
+            senderWorkerMap.put(sid, sw);
             if (!queueSendMap.containsKey(sid)) {
                 queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                         CAPACITY));
@@ -258,11 +272,12 @@ public class QuorumCnxManager {
             sw.setRecv(rw);
 
             SendWorker vsw = senderWorkerMap.get(sid);
-            senderWorkerMap.put(sid, sw);
             
             if(vsw != null)
                 vsw.finish();
-
+            
+            senderWorkerMap.put(sid, sw);
+            
             if (!queueSendMap.containsKey(sid)) {
                 queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                         CAPACITY));
@@ -343,9 +358,12 @@ public class QuorumCnxManager {
             }
             try {
                 SocketChannel channel;
-                LOG.debug("Opening channel to server "  + sid);
-                channel = SocketChannel
-                        .open(self.getView().get(sid).electionAddr);
+                if(LOG.isDebugEnabled()){
+                    LOG.debug("Opening channel to server "  + sid);
+                }
+                
+                channel = SocketChannel.open();
+                channel.socket().connect(self.getView().get(sid).electionAddr, 
cnxTO);                
                 channel.socket().setTcpNoDelay(true);
                 initiateConnection(channel, sid);
             } catch (UnresolvedAddressException e) {
@@ -518,10 +536,19 @@ public class QuorumCnxManager {
         }
                 
         synchronized boolean finish() {
+            if(LOG.isDebugEnabled()){
+                LOG.debug("Calling finish");
+            }
+            
+            if(!running){
+                /*
+                 * Avoids running finish() twice. 
+                 */
+                return running;
+            }
+            
             running = false;
-
-            LOG.debug("Calling finish");
-            this.interrupt();
+            
             try{
                 channel.close();
             } catch (IOException e) {
@@ -532,6 +559,10 @@ public class QuorumCnxManager {
             this.interrupt();
             if (recvWorker != null)
                 recvWorker.finish();
+            
+            if(LOG.isDebugEnabled()){
+                LOG.debug("Removing entry from senderWorkerMap sid=" + sid);
+            }
             senderWorkerMap.remove(sid);
             return running;
         }
@@ -583,7 +614,8 @@ public class QuorumCnxManager {
                     }
                 }
             } catch (Exception e) {
-                LOG.warn("Exception when using channel: " + sid, e);
+                LOG.warn("Exception when using channel: for id " + sid + " my 
id = " + 
+                        self.getId() + " error = " + e);
             }
             this.finish();
             LOG.warn("Send worker leaving thread");
@@ -610,7 +642,14 @@ public class QuorumCnxManager {
          * @return boolean  Value of variable running
          */
         synchronized boolean finish() {
-            running = false;
+            if(!running){
+                /*
+                 * Avoids running finish() twice. 
+                 */
+                return running;
+            }
+            running = false;            
+
             this.interrupt();
             return running;
         }
@@ -655,7 +694,8 @@ public class QuorumCnxManager {
                 }
 
             } catch (Exception e) {
-                LOG.warn("Connection broken: ", e);
+                LOG.warn("Connection broken for id " + sid + ", my id = " + 
+                        self.getId() + ", error = " + e);
             } finally {
                 try{
                     channel.socket().close();

Modified: 
hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=1005124&r1=1005123&r2=1005124&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
 Wed Oct  6 17:02:16 2010
@@ -22,6 +22,7 @@ import java.io.File;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
@@ -178,6 +179,38 @@ public class CnxManagerTest extends Test
         }
     }
     
+    @Test
+    public void testCnxManagerTimeout() throws Exception {
+        Random rand = new Random();
+        byte b = (byte) rand.nextInt();
+        int deadPort = PortAssignment.unique();
+        String deadAddress = new String("10.1.1." + b);
+    
+        LOG.info("This is the dead address I'm trying: " + deadAddress);
+    
+        peers.put(Long.valueOf(2),
+                new QuorumServer(2,
+                        new InetSocketAddress(deadAddress, deadPort),
+                        new InetSocketAddress(deadAddress, 
PortAssignment.unique())));
+        tmpdir[2] = ClientBase.createTmpDir();
+        port[2] = deadPort;
     
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 
3, 1, 2, 2, 2);
+        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if(listener != null){
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+
+        long begin = System.currentTimeMillis();
+        cnxManager.toSend(new Long(2), 
createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1));
+        long end = System.currentTimeMillis();
+    
+        if((end - begin) > 6000) fail("Waited more than necessary");
+    
+    }
+ 
     
 }
\ No newline at end of file


Reply via email to