Author: phunt
Date: Tue Oct 19 22:32:09 2010
New Revision: 1024438

URL: http://svn.apache.org/viewvc?rev=1024438&view=rev
Log:
ZOOKEEPER-893. ZooKeeper high cpu usage when invalid requests

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=1024438&r1=1024437&r2=1024438&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Tue Oct 19 22:32:09 2010
@@ -110,14 +110,20 @@ BUGFIXES: 
   ZOOKEEPER-822. Leader election taking a long time to complete
   (Vishal K via phunt)
 
-  ZOOKEEPER-866. Hedwig Server stays in "disconnected" state when connection 
to ZK dies but gets reconnected (erwin tam via breed)
+  ZOOKEEPER-866. Hedwig Server stays in "disconnected" state when
+  connection to ZK dies but gets reconnected (erwin tam via breed)
 
-  ZOOKEEPER-881. ZooKeeperServer.loadData loads database twice (jared cantwell 
via breed)
+  ZOOKEEPER-881. ZooKeeperServer.loadData loads database twice
+  (jared cantwell via breed)
 
   ZOOKEEPER-855. clientPortBindAddress should be clientPortAddress
   (Jared Cantwell via fpj)
 
-  ZOOKEEPER-888. c-client / zkpython: Double free corruption on node watcher 
(Austin Shoemaker via henryr)
+  ZOOKEEPER-888. c-client / zkpython: Double free corruption on
+  node watcher (Austin Shoemaker via henryr)
+
+  ZOOKEEPER-893. ZooKeeper high cpu usage when invalid requests
+  (Thijs Terlouw via phunt)
 
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1024438&r1=1024437&r2=1024438&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
 Tue Oct 19 22:32:09 2010
@@ -673,28 +673,33 @@ public class QuorumCnxManager {
                     }
                     msgLength.position(0);
                     int length = msgLength.getInt();
+                    if(length <= 0) {
+                        throw new IOException("Invalid packet length:" + 
length);
+                    }
                     /**
                      * Allocates a new ByteBuffer to receive the message
                      */
-                    if (length > 0) {
-                        if (length > PACKETMAXSIZE) {
-                            throw new IOException("Invalid packet of length " 
+ length);
-                        }
-                        byte[] msgArray = new byte[length];
-                        ByteBuffer message = ByteBuffer.wrap(msgArray);
-                        int numbytes = 0;
-                        while (message.hasRemaining()) {
-                            numbytes += channel.read(message);
-                        }
-                        message.position(0);
-                        synchronized (recvQueue) {
-                            recvQueue
-                                    .put(new Message(message.duplicate(), 
sid));
+                    if (length > PACKETMAXSIZE) {
+                        throw new IOException("Invalid packet of length " + 
length);
+                    }
+                    byte[] msgArray = new byte[length];
+                    ByteBuffer message = ByteBuffer.wrap(msgArray);
+                    int numbytes = 0;
+                    int temp_numbytes = 0;
+                    while (message.hasRemaining()) {
+                        temp_numbytes = channel.read(message); 
+                        if(temp_numbytes < 0) {
+                            throw new IOException("Channel eof before end");
                         }
-                        msgLength.position(0);
+                        numbytes += temp_numbytes;
+                    }
+                    message.position(0);
+                    synchronized (recvQueue) {
+                        recvQueue
+                        .put(new Message(message.duplicate(), sid));
                     }
+                    msgLength.position(0);
                 }
-
             } catch (Exception e) {
                 LOG.warn("Connection broken for id " + sid + ", my id = " + 
                         self.getId() + ", error = " + e);

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=1024438&r1=1024437&r2=1024438&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
 Tue Oct 19 22:32:09 2010
@@ -21,6 +21,7 @@ package org.apache.zookeeper.test;
 import java.io.File;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
 import java.util.HashMap;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
@@ -196,4 +197,63 @@ public class CnxManagerTest extends ZKTe
         if((end - begin) > 6000) Assert.fail("Waited more than necessary");
         
     }       
+    
+    /**
+     * Tests a bug in QuorumCnxManager that causes a spin lock
+     * when a negative value is sent. This test checks if the 
+     * connection is being closed upon a message with negative
+     * length.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testCnxManagerSpinLock() throws Exception {               
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 
3, 1, 2, 2, 2);
+        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if(listener != null){
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+        
+        int port = peers.get(peer.getId()).electionAddr.getPort();
+        LOG.info("Election port: " + port);
+        InetSocketAddress addr = new InetSocketAddress(port);
+        
+        Thread.sleep(1000);
+        
+        SocketChannel sc = SocketChannel.open();
+        sc.socket().connect(peers.get(new Long(1)).electionAddr, 5000);
+        
+        /*
+         * Write id first then negative length.
+         */
+        byte[] msgBytes = new byte[8];
+        ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
+        msgBuffer.putLong(new Long(2));
+        msgBuffer.position(0);
+        sc.write(msgBuffer);
+        
+        msgBuffer = ByteBuffer.wrap(new byte[4]);
+        msgBuffer.putInt(-20);
+        msgBuffer.position(0);
+        sc.write(msgBuffer);
+        
+        Thread.sleep(1000);
+        
+        try{
+            /*
+             * Write a number of times until it
+             * detects that the socket is broken.
+             */
+            for(int i = 0; i < 100; i++){
+                msgBuffer.position(0);
+                sc.write(msgBuffer);
+            }
+            Assert.fail("Socket has not been closed");
+        } catch (Exception e) {
+            LOG.info("Socket has been closed as expected");
+        }
+    }
 }
\ No newline at end of file


Reply via email to