fhanik 2004/01/09 15:24:09 Modified: modules/cluster/src/share/org/apache/catalina/cluster/io ObjectReader.java XByteBuffer.java modules/cluster/src/share/org/apache/catalina/cluster/session SimpleTcpReplicationManager.java modules/cluster/src/share/org/apache/catalina/cluster/tcp IDataSenderFactory.java SimpleTcpCluster.java SocketSender.java TcpReplicationThread.java ThreadPool.java Added: modules/cluster/src/share/org/apache/catalina/cluster/tcp PooledSocketSender.java Log: Implemented socket pool for replication since the synchronized send became a bottleneck. This is a dramatic performance improvement Revision Changes Path 1.4 +9 -8 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java Index: ObjectReader.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- ObjectReader.java 19 Dec 2003 21:22:13 -0000 1.3 +++ ObjectReader.java 9 Jan 2004 23:24:08 -0000 1.4 @@ -105,6 +105,11 @@ public int append(byte[] data,int off,int len) throws java.io.IOException { boolean result = false; buffer.append(data,off,len); + int pkgCnt = buffer.countPackages(); + return pkgCnt; + } + + public int execute() throws java.io.IOException { int pkgCnt = 0; boolean pkgExists = buffer.doesPackageExist(); while ( pkgExists ) { @@ -114,10 +119,6 @@ pkgExists = buffer.doesPackageExist(); }//end if return pkgCnt; - } - - public int execute() throws java.io.IOException { - return append(new byte[0],0,0); } public int write(ByteBuffer buf) 1.5 +66 -24 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java Index: XByteBuffer.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- XByteBuffer.java 20 Dec 2003 00:48:52 -0000 1.4 +++ XByteBuffer.java 9 Jan 2004 23:24:08 -0000 1.5 @@ -180,24 +180,35 @@ * within the buffer * @return - true if a complete package (header,size,data,footer) exists within the buffer */ - protected int packageExists() + public int countPackages() { + int cnt = 0; int pos = START_DATA.length; - //first check start header - int index = this.firstIndexOf(buf,0,START_DATA); - //if the header (START_DATA) isn't the first thing or - //the buffer isn't even 10 bytes - if ( index != 0 || (bufSize<10) ) return 0; - //then get the size 4 bytes - int size = toInt(buf,pos); - //now the total buffer has to be long enough to hold - //START_DATA.length+4+size+END_DATA.length - pos = START_DATA.length+4+size; - if ( (pos+END_DATA.length) > bufSize ) return 0; - //and finally check the footer of the package END_DATA - int newpos = firstIndexOf(buf,pos,END_DATA); - if ( newpos != pos ) return 0; - return size; + int start = 0; + + while ( start < bufSize ) { + //first check start header + int index = this.firstIndexOf(buf,start,START_DATA); + //if the header (START_DATA) isn't the first thing or + //the buffer isn't even 10 bytes + if ( index != start || ((bufSize-start)<10) ) break; + //then get the size 4 bytes + int size = toInt(buf, pos); + //now the total buffer has to be long enough to hold + //START_DATA.length+4+size+END_DATA.length + pos = start + START_DATA.length + 4 + size; + if ( (pos + END_DATA.length) > bufSize) break; + //and finally check the footer of the package END_DATA + int newpos = firstIndexOf(buf, pos, END_DATA); + //mismatch, there is no package + if (newpos != pos) break; + //increase the packet count + cnt++; + //reset the values + start = pos + END_DATA.length; + pos = start + START_DATA.length; + }//while + return cnt; }//getSize /** @@ -205,7 +216,7 @@ * @return - true if a complete package (header,size,data,footer) exists within the buffer */ public boolean doesPackageExist() { - return (packageExists()>0); + return (countPackages()>0); }//doesPackageExist /** @@ -215,8 +226,9 @@ * @return - returns the actual message bytes (header, size and footer not included). */ public byte[] extractPackage(boolean clearFromBuffer) throws java.io.IOException { - int size = packageExists(); - if ( size == 0 ) throw new java.lang.IllegalStateException("No package exists in XByteBuffer"); + int psize = countPackages(); + if ( psize == 0 ) throw new java.lang.IllegalStateException("No package exists in XByteBuffer"); + int size = toInt(buf, START_DATA.length); byte[] data = new byte[size]; System.arraycopy(buf,START_DATA.length+4,data,0,size); if ( clearFromBuffer ) { @@ -382,7 +394,7 @@ return result; }//createDataPackage - public static void main(String[] args) { + public static void main(String[] args) throws Exception { System.out.println("Before="+Integer.MAX_VALUE); byte[] d = toBytes(Integer.MAX_VALUE); System.out.println("After="+toInt(d,0)); @@ -395,6 +407,36 @@ System.out.println("Before=" + 4564564); d = toBytes((long)4564564); System.out.println("After=" + toLong(d, 0)); + + byte[] d1 = createDataPackage(new byte[] {1}); + byte[] d2 = createDataPackage(new byte[] {2}); + byte[] d3 = createDataPackage(new byte[] {3}); + byte[] test = new byte[d1.length+d2.length+d3.length+5]; + System.arraycopy(d1,0,test,0,d1.length); + System.arraycopy(d2,0,test,d1.length,d2.length); + System.arraycopy(d3,0,test,d2.length+d1.length,d3.length); + printBuf(d1); + printBuf(d2); + printBuf(d3); + printBuf(test); + XByteBuffer b = new XByteBuffer(); + b.append(test,0,test.length); + int s = b.countPackages(); + System.out.println("Nr of packages="+s); + while ( s > 0 ) { + d = b.extractPackage(true); + System.out.print("Package d1="); + printBuf(d); + s--; + }//while + + } + + public static void printBuf(byte[] b) { + for ( int i=0; i<b.length; i++ ) { + System.out.print(b[i] + " "); + } + System.out.println(); } }//class 1.18 +4 -4 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java Index: SimpleTcpReplicationManager.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java,v retrieving revision 1.17 retrieving revision 1.18 diff -u -r1.17 -r1.18 --- SimpleTcpReplicationManager.java 15 Dec 2003 21:33:06 -0000 1.17 +++ SimpleTcpReplicationManager.java 9 Jan 2004 23:24:09 -0000 1.18 @@ -506,7 +506,7 @@ reqNow = System.currentTimeMillis(); isTimeout=((reqNow-reqStart)>(1000*60)); } while ( (!isStateTransferred()) && (!isTimeout)); - if ( isTimeout ) { + if ( isTimeout || (!isStateTransferred()) ) { log.error("Manager["+getName()+"], No session state received, timing out."); }else { log.info("Manager["+getName()+"], session state received in "+(reqNow-reqStart)+" ms."); 1.2 +20 -6 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSenderFactory.java Index: IDataSenderFactory.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSenderFactory.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- IDataSenderFactory.java 18 Apr 2003 02:51:24 -0000 1.1 +++ IDataSenderFactory.java 9 Jan 2004 23:24:09 -0000 1.2 @@ -69,15 +69,29 @@ } public static final String SYNC_MODE="synchronous"; public static final String ASYNC_MODE="asynchronous"; - public synchronized static IDataSender getIDataSender(String mode, Member mbr) + public static final String POOLED_SYNC_MODE="pooled"; + + public synchronized static IDataSender getIDataSender(String mode, Member mbr) throws java.io.IOException { if (SYNC_MODE.equals(mode) ) return new SocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort()); else if ( ASYNC_MODE.equals(mode) ) return new AsyncSocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort()); + if (POOLED_SYNC_MODE.equals(mode) ) + return new PooledSocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort()); else throw new java.io.IOException("Invalid replication mode="+mode); } - -} \ No newline at end of file + public static String validateMode(String mode) { + if (SYNC_MODE.equals(mode) || + ASYNC_MODE.equals(mode) || + POOLED_SYNC_MODE.equals(mode) ) { + return null; + } else { + return "Replication mode has to be '"+SYNC_MODE+"', '"+ASYNC_MODE+"' or '"+POOLED_SYNC_MODE+"'"; + } + } + + +} 1.23 +9 -10 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java Index: SimpleTcpCluster.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v retrieving revision 1.22 retrieving revision 1.23 diff -u -r1.22 -r1.23 --- SimpleTcpCluster.java 9 Jan 2004 02:50:54 -0000 1.22 +++ SimpleTcpCluster.java 9 Jan 2004 23:24:09 -0000 1.23 @@ -319,12 +319,12 @@ } public void setReplicationMode(String mode) { - if ("synchronous".equals(mode) || - "asynchronous".equals(mode)) { + String msg = IDataSenderFactory.validateMode(mode); + if ( msg == null ) { log.debug("Setting replcation mode to "+mode); this.replicationMode = mode; } else - throw new IllegalArgumentException("Replication mode must be either synchronous or asynchronous"); + throw new IllegalArgumentException(msg); } /** @@ -496,8 +496,8 @@ this.tcpAddress, this.tcpPort, this.tcpSelectorTimeout, - "synchronous".equals(this. - replicationMode)); + IDataSenderFactory.SYNC_MODE.equals(replicationMode) || + IDataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode)); mReplicationListener.setDaemon(true); mReplicationListener.start(); } @@ -592,7 +592,6 @@ log.info("Received member disappeared:"+member); try { - log.info("Replication member disappeared:" + member); Member mbr = member; mReplicationTransmitter.remove(InetAddress.getByName(mbr.getHost()), mbr.getPort()); 1.9 +12 -4 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java Index: SocketSender.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java,v retrieving revision 1.8 retrieving revision 1.9 diff -u -r1.8 -r1.9 --- SocketSender.java 19 Dec 2003 22:59:24 -0000 1.8 +++ SocketSender.java 9 Jan 2004 23:24:09 -0000 1.9 @@ -132,12 +132,20 @@ return isSocketConnected; } - private void checkIfDisconnect() { + public void checkIfDisconnect() { long ctime = System.currentTimeMillis() - this.keepAliveConnectTime; if ( (ctime > this.keepAliveTimeout) || (this.keepAliveCount >= this.keepAliveMaxRequestCount) ) { disconnect(); } + } + + public void setAckTimeout(long timeout) { + this.ackTimeout = timeout; + } + + public long getAckTimeout() { + return ackTimeout; } /** 1.7 +9 -11 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java Index: TcpReplicationThread.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- TcpReplicationThread.java 9 Jan 2004 02:50:54 -0000 1.6 +++ TcpReplicationThread.java 9 Jan 2004 23:24:09 -0000 1.7 @@ -169,12 +169,6 @@ while ((count = channel.read (buffer)) > 0) { buffer.flip(); // make buffer readable int pkgcnt = reader.append(buffer.array(),0,count); - while ( pkgcnt > 0 ) { - if (synchronous) { - sendAck(key,channel); - } //end if - pkgcnt--; - } buffer.clear(); // make buffer empty } //check to see if any data is available @@ -196,8 +190,12 @@ key.selector().wakeup(); } - private void sendAck(SelectionKey key, SocketChannel channel) throws java.io.IOException { + private void sendAck(SelectionKey key, SocketChannel channel) { //send a reply-acknowledgement - channel.write(ByteBuffer.wrap(new byte[] {6,2,3})); + try { + channel.write(ByteBuffer.wrap(new byte[] {6, 2, 3})); + } catch ( java.io.IOException x ) { + log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage()); + } } } 1.4 +7 -7 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ThreadPool.java Index: ThreadPool.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ThreadPool.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- ThreadPool.java 20 Dec 2003 00:48:52 -0000 1.3 +++ ThreadPool.java 9 Jan 2004 23:24:09 -0000 1.4 @@ -107,11 +107,11 @@ { WorkerThread worker = null; - //synchronized (idle) { + synchronized (idle) { if (idle.size() > 0) { worker = (WorkerThread) idle.remove (0); } - //} + } return (worker); } @@ -122,8 +122,8 @@ */ void returnWorker (WorkerThread worker) { - //synchronized (idle) { + synchronized (idle) { idle.add (worker); - //} + } } } 1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java Index: PooledSocketSender.java =================================================================== /* * $Header: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java,v 1.1 2004/01/09 23:24:09 fhanik Exp $ * $Revision: 1.1 $ * $Date: 2004/01/09 23:24:09 $ * * ==================================================================== * * The Apache Software License, Version 1.1 * * Copyright (c) 1999 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, if * any, must include the following acknowlegement: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowlegement may appear in the software itself, * if and wherever such third-party acknowlegements normally appear. * * 4. The names "The Jakarta Project", "Tomcat", and "Apache Software * Foundation" must not be used to endorse or promote products derived * from this software without prior written permission. For written * permission, please contact [EMAIL PROTECTED] * * 5. Products derived from this software may not be called "Apache" * nor may "Apache" appear in their names without prior written * permission of the Apache Group. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * * [Additional notices, if required by prior licensing conditions] * */ package org.apache.catalina.cluster.tcp; import java.net.InetAddress ; import java.net.Socket; import java.util.LinkedList; import java.util.List; import java.util.Collections; /** * <p>Title: </p> * <p>Description: </p> * <p>Copyright: Copyright (c) 2002</p> * <p>Company: </p> * @author not attributable * @version 1.0 */ public class PooledSocketSender implements IDataSender { private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( org.apache.catalina.cluster.tcp.SimpleTcpCluster.class ); private InetAddress address; private int port; private Socket sc = null; private boolean isSocketConnected = false; private boolean suspect; private long ackTimeout = 150*1000; //15 seconds socket read timeout (for acknowledgement) private long keepAliveTimeout = 60*1000; //keep socket open for no more than one min private int keepAliveMaxRequestCount = 100; //max 100 requests before reconnecting private long keepAliveConnectTime = 0; private int keepAliveCount = 0; private int maxPoolSocketLimit = 25; private SenderQueue senderQueue = null; public PooledSocketSender(InetAddress host, int port) { this.address = host; this.port = port; senderQueue = new SenderQueue(this,maxPoolSocketLimit); } public InetAddress getAddress() { return address; } public int getPort() { return port; } public void connect() throws java.io.IOException { //do nothing, happens in the socket sender itself } public void disconnect() { senderQueue.close(); } public boolean isConnected() { return isSocketConnected; } public void setAckTimeout(long timeout) { this.ackTimeout = timeout; } public long getAckTimeout() { return ackTimeout; } public void setMaxPoolSocketLimit(int limit) { maxPoolSocketLimit = limit; } public int getMaxPoolSocketLimit() { return maxPoolSocketLimit; } /** * Blocking send * @param data * @throws java.io.IOException */ public void sendMessage(String sessionId, byte[] data) throws java.io.IOException { //get a socket sender from the pool SocketSender sender = senderQueue.getSender(0); //send the message sender.sendMessage(sessionId,data); //return the connection to the pool senderQueue.returnSender(sender); } public String toString() { StringBuffer buf = new StringBuffer("PooledSocketSender["); buf.append(getAddress()).append(":").append(getPort()).append("]"); return buf.toString(); } public boolean getSuspect() { return suspect; } public void setSuspect(boolean suspect) { this.suspect = suspect; } public long getKeepAliveTimeout() { return keepAliveTimeout; } public void setKeepAliveTimeout(long keepAliveTimeout) { this.keepAliveTimeout = keepAliveTimeout; } public int getKeepAliveMaxRequestCount() { return keepAliveMaxRequestCount; } public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) { this.keepAliveMaxRequestCount = keepAliveMaxRequestCount; } private class SenderQueue { private int limit = 25; PooledSocketSender parent = null; private LinkedList queue = new LinkedList(); private LinkedList inuse = new LinkedList(); private Object mutex = new Object(); public SenderQueue(PooledSocketSender parent, int limit) { this.limit = limit; this.parent = parent; } public SocketSender getSender(long timeout) { SocketSender sender = null; long start = System.currentTimeMillis(); long delta = 0; do { synchronized (mutex) { if ( queue.size() > 0 ) { sender = (SocketSender) queue.removeFirst(); } else if ( inuse.size() < limit ) { sender = getNewSocketSender(); } else { try { mutex.wait(timeout); }catch ( Exception x ) { parent.log.warn("PoolSocketSender.senderQueue.getSender failed",x); }//catch }//end if if ( sender != null ) { inuse.add(sender); } }//synchronized delta = System.currentTimeMillis() - start; } while ( (sender == null) && (timeout==0?true:(delta<timeout)) ); //to do return sender; } public void returnSender(SocketSender sender) { //to do synchronized (mutex) { queue.add(sender); inuse.remove(sender); mutex.notify(); } } private SocketSender getNewSocketSender() { //new SocketSender( SocketSender sender = new SocketSender(parent.getAddress(),parent.getPort()); sender.setKeepAliveMaxRequestCount(parent.getKeepAliveMaxRequestCount()); sender.setKeepAliveTimeout(parent.getKeepAliveTimeout()); sender.setAckTimeout(parent.getAckTimeout()); return sender; } public void close() { synchronized (mutex) { for ( int i=0; i<queue.size(); i++ ) { SocketSender sender = (SocketSender)queue.get(i); sender.disconnect(); }//for for ( int i=0; i<inuse.size(); i++ ) { SocketSender sender = (SocketSender) inuse.get(i); sender.disconnect(); }//for queue.clear(); inuse.clear(); } } } }
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]