Author: phunt Date: Tue Oct 19 22:32:09 2010 New Revision: 1024438 URL: http://svn.apache.org/viewvc?rev=1024438&view=rev Log: ZOOKEEPER-893. ZooKeeper high cpu usage when invalid requests
Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=1024438&r1=1024437&r2=1024438&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Tue Oct 19 22:32:09 2010 @@ -110,14 +110,20 @@ BUGFIXES: ZOOKEEPER-822. Leader election taking a long time to complete (Vishal K via phunt) - ZOOKEEPER-866. Hedwig Server stays in "disconnected" state when connection to ZK dies but gets reconnected (erwin tam via breed) + ZOOKEEPER-866. Hedwig Server stays in "disconnected" state when + connection to ZK dies but gets reconnected (erwin tam via breed) - ZOOKEEPER-881. ZooKeeperServer.loadData loads database twice (jared cantwell via breed) + ZOOKEEPER-881. ZooKeeperServer.loadData loads database twice + (jared cantwell via breed) ZOOKEEPER-855. clientPortBindAddress should be clientPortAddress (Jared Cantwell via fpj) - ZOOKEEPER-888. c-client / zkpython: Double free corruption on node watcher (Austin Shoemaker via henryr) + ZOOKEEPER-888. c-client / zkpython: Double free corruption on + node watcher (Austin Shoemaker via henryr) + + ZOOKEEPER-893. ZooKeeper high cpu usage when invalid requests + (Thijs Terlouw via phunt) IMPROVEMENTS: ZOOKEEPER-724. Improve junit test integration - log harness information Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1024438&r1=1024437&r2=1024438&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Tue Oct 19 22:32:09 2010 @@ -673,28 +673,33 @@ public class QuorumCnxManager { } msgLength.position(0); int length = msgLength.getInt(); + if(length <= 0) { + throw new IOException("Invalid packet length:" + length); + } /** * Allocates a new ByteBuffer to receive the message */ - if (length > 0) { - if (length > PACKETMAXSIZE) { - throw new IOException("Invalid packet of length " + length); - } - byte[] msgArray = new byte[length]; - ByteBuffer message = ByteBuffer.wrap(msgArray); - int numbytes = 0; - while (message.hasRemaining()) { - numbytes += channel.read(message); - } - message.position(0); - synchronized (recvQueue) { - recvQueue - .put(new Message(message.duplicate(), sid)); + if (length > PACKETMAXSIZE) { + throw new IOException("Invalid packet of length " + length); + } + byte[] msgArray = new byte[length]; + ByteBuffer message = ByteBuffer.wrap(msgArray); + int numbytes = 0; + int temp_numbytes = 0; + while (message.hasRemaining()) { + temp_numbytes = channel.read(message); + if(temp_numbytes < 0) { + throw new IOException("Channel eof before end"); } - msgLength.position(0); + numbytes += temp_numbytes; + } + message.position(0); + synchronized (recvQueue) { + recvQueue + .put(new Message(message.duplicate(), sid)); } + msgLength.position(0); } - } catch (Exception e) { LOG.warn("Connection broken for id " + sid + ", my id = " + self.getId() + ", error = " + e); Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=1024438&r1=1024437&r2=1024438&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Tue Oct 19 22:32:09 2010 @@ -21,6 +21,7 @@ package org.apache.zookeeper.test; import java.io.File; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; import java.util.HashMap; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -196,4 +197,63 @@ public class CnxManagerTest extends ZKTe if((end - begin) > 6000) Assert.fail("Waited more than necessary"); } + + /** + * Tests a bug in QuorumCnxManager that causes a spin lock + * when a negative value is sent. This test checks if the + * connection is being closed upon a message with negative + * length. + * + * @throws Exception + */ + @Test + public void testCnxManagerSpinLock() throws Exception { + QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2, 2, 2); + QuorumCnxManager cnxManager = new QuorumCnxManager(peer); + QuorumCnxManager.Listener listener = cnxManager.listener; + if(listener != null){ + listener.start(); + } else { + LOG.error("Null listener when initializing cnx manager"); + } + + int port = peers.get(peer.getId()).electionAddr.getPort(); + LOG.info("Election port: " + port); + InetSocketAddress addr = new InetSocketAddress(port); + + Thread.sleep(1000); + + SocketChannel sc = SocketChannel.open(); + sc.socket().connect(peers.get(new Long(1)).electionAddr, 5000); + + /* + * Write id first then negative length. + */ + byte[] msgBytes = new byte[8]; + ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes); + msgBuffer.putLong(new Long(2)); + msgBuffer.position(0); + sc.write(msgBuffer); + + msgBuffer = ByteBuffer.wrap(new byte[4]); + msgBuffer.putInt(-20); + msgBuffer.position(0); + sc.write(msgBuffer); + + Thread.sleep(1000); + + try{ + /* + * Write a number of times until it + * detects that the socket is broken. + */ + for(int i = 0; i < 100; i++){ + msgBuffer.position(0); + sc.write(msgBuffer); + } + Assert.fail("Socket has not been closed"); + } catch (Exception e) { + LOG.info("Socket has been closed as expected"); + } + } } \ No newline at end of file