pero 2005/03/25 14:21:27 Modified: modules/cluster/src/share/org/apache/catalina/cluster/io Jdk13ObjectReader.java ObjectReader.java XByteBuffer.java Log: add compress handling add some documentation Revision Changes Path 1.5 +42 -11 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/Jdk13ObjectReader.java Index: Jdk13ObjectReader.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/Jdk13ObjectReader.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- Jdk13ObjectReader.java 1 Jul 2004 09:44:27 -0000 1.4 +++ Jdk13ObjectReader.java 25 Mar 2005 22:21:26 -0000 1.5 @@ -16,6 +16,10 @@ package org.apache.catalina.cluster.io; + +import java.net.Socket; +import org.apache.catalina.cluster.io.XByteBuffer; + /** * The object reader object is an object used in conjunction with * java.nio TCP messages. This object stores the message bytes in a @@ -26,24 +30,41 @@ * for message encoding and decoding. * * @author Filip Hanik + * @author Peter Rossbach * @version $Revision$, $Date$ */ - -import java.net.Socket; -import org.apache.catalina.cluster.io.XByteBuffer; public class Jdk13ObjectReader { private Socket socket; private ListenCallback callback; private XByteBuffer buffer; + /** + * use this socket and callback to receive messages + * @param socket listener socket + * @param callback SimpleTcpCluster listener + * @param compress is send message data compress or flat. + */ public Jdk13ObjectReader( Socket socket, - ListenCallback callback ) { + ListenCallback callback, boolean compress) { this.socket = socket; this.callback = callback; - this.buffer = new XByteBuffer(); + this.buffer = new XByteBuffer(compress); } + + /** + * Append new bytes to buffer. + * Is message complete receiver send message to callback + * @see org.apache.catalina.cluster.tcp.SimpleTcpCluster#messageDataReceived(byte[]) + * @see XByteBuffer#doesPackageExist() + * @see XByteBuffer#extractPackage(boolean) + * @param data new transfer buffer + * @param off offset + * @param len length in buffer + * @return number of messages that sended to callback + * @throws java.io.IOException + */ public int append(byte[] data,int off,int len) throws java.io.IOException { boolean result = false; buffer.append(data,off,len); @@ -54,22 +75,32 @@ callback.messageDataReceived(b); pkgCnt++; pkgExists = buffer.doesPackageExist(); - }//end if + } return pkgCnt; } + + /** + * send message to callback + * @see Jdk13ObjectReader#append(byte[], int, int) + * @return + * @throws java.io.IOException + */ public int execute() throws java.io.IOException { return append(new byte[0],0,0); } + /** + * write data to socket (ack) + * @see org.apache.catalina.cluster.tcp.Jdk13ReplicationListener#sendAck + * @param data + * @return + * @throws java.io.IOException + */ public int write(byte[] data) throws java.io.IOException { socket.getOutputStream().write(data); return 0; } - - - - } 1.7 +59 -23 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java Index: ObjectReader.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- ObjectReader.java 1 Jul 2004 09:44:27 -0000 1.6 +++ ObjectReader.java 25 Mar 2005 22:21:26 -0000 1.7 @@ -13,9 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.catalina.cluster.io; +import java.nio.ByteBuffer; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; + /** * The object reader object is an object used in conjunction with * java.nio TCP messages. This object stores the message bytes in a @@ -26,59 +29,92 @@ * for message encoding and decoding. * * @author Filip Hanik + * @author Peter Rossbach * @version $Revision$, $Date$ */ +public class ObjectReader { -import java.nio.channels.SocketChannel; -import java.nio.channels.Selector; -import java.nio.ByteBuffer; -import org.apache.catalina.cluster.io.XByteBuffer; -public class ObjectReader -{ private SocketChannel channel; + private Selector selector; + private ListenCallback callback; + private XByteBuffer buffer; - public ObjectReader( SocketChannel channel, - Selector selector, - ListenCallback callback ) { + /** + * Create XByteBuffer and store parameter + * @param channel + * @param selector + * @param callback + */ + public ObjectReader(SocketChannel channel, Selector selector, ListenCallback callback, boolean isCompressed) { this.channel = channel; this.selector = selector; this.callback = callback; - this.buffer = new XByteBuffer(); + this.buffer = new XByteBuffer(isCompressed); } + /** + * get the current SimpleTcpCluster + * @return Returns the callback. + */ + public ListenCallback getCallback() { + return callback; + } - public SocketChannel getChannel() { + /** + * Get underlying NIO channel + * @return + */ + public SocketChannel getChannel() { return this.channel; } - public int append(byte[] data,int off,int len) throws java.io.IOException { - boolean result = false; + /** + * Append new bytes to buffer. + * @see XByteBuffer#countPackages() + * @param data new transfer buffer + * @param off offset + * @param len length in buffer + * @return number of messages that sended to callback + * @throws java.io.IOException + */ + public int append(byte[] data,int off,int len) throws java.io.IOException { buffer.append(data,off,len); int pkgCnt = buffer.countPackages(); return pkgCnt; } + /** + * Send buffer to cluster listener (callback) + * Is message complete receiver send message to callback + * @see org.apache.catalina.cluster.tcp.SimpleTcpCluster#messageDataReceived(byte[]) + * @see XByteBuffer#doesPackageExist() + * @see XByteBuffer#extractPackage() + * @return number of received packages/messages + * @throws java.io.IOException + */ public int execute() throws java.io.IOException { int pkgCnt = 0; boolean pkgExists = buffer.doesPackageExist(); while ( pkgExists ) { byte[] b = buffer.extractPackage(true); - callback.messageDataReceived(b); + getCallback().messageDataReceived(b); pkgCnt++; pkgExists = buffer.doesPackageExist(); - }//end if + } return pkgCnt; } - - public int write(ByteBuffer buf) - throws java.io.IOException { + + /** + * Write Ack to sender + * @param buf + * @return + * @throws java.io.IOException + */ + public int write(ByteBuffer buf) throws java.io.IOException { return getChannel().write(buf); } - - - } 1.11 +109 -65 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java Index: XByteBuffer.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java,v retrieving revision 1.10 retrieving revision 1.11 diff -u -r1.10 -r1.11 --- XByteBuffer.java 17 Sep 2004 18:32:14 -0000 1.10 +++ XByteBuffer.java 25 Mar 2005 22:21:26 -0000 1.11 @@ -16,17 +16,29 @@ package org.apache.catalina.cluster.io; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + /** * The XByteBuffer provides a dual functionality. * One, it stores message bytes and automatically extends the byte buffer if needed.<BR> * Two, it can encode and decode packages so that they can be defined and identified * as they come in on a socket. + * <br/> + * Transfer package: + * <ul> + * <li><b>START_DATA/b> - 7 bytes - <i>FLT2002</i></li> + * <li><b>SIZE</b> - 4 bytes - size of the data package</li> + * <li><b>DATA</b> - should be as many bytes as the prev SIZE</li> + * <li><b>END_DATA</b> - 7 bytes - <i>TLF2003</i></lI> + * </ul> * * @author Filip Hanik + * @author Peter Rossbach * @version $Revision$, $Date$ */ - - public class XByteBuffer { @@ -34,52 +46,63 @@ org.apache.commons.logging.LogFactory.getLog( XByteBuffer.class ); /** - * This is a package header, 7 bytes + * This is a package header, 7 bytes (FLT2002) */ public static final byte[] START_DATA = {70,76,84,50,48,48,50}; + /** - * This is the package footer, 7 bytes + * This is the package footer, 7 bytes (TLF2003) */ public static final byte[] END_DATA = {84,76,70,50,48,48,51}; - //A package looks like, always. - /** - * START_DATA - 7 bytes - * SIZE - 4 bytes - size of the data package - * DATA - should be as many bytes as the prev SIZE - * END_DATA - 7 bytes - */ - + /** * Default size on the initial byte buffer */ static final int DEF_SIZE = 1024; + /** * Default size to extend the buffer with */ static final int DEF_EXT = 1024; + /** * Variable to hold the data */ protected byte[] buf = null; + /** * Current length of data in the buffer */ protected int bufSize = 0; /** + * Compress/Decompress user data + */ + protected boolean compress = true ; + + /** * Constructs a new XByteBuffer * @param size - the initial size of the byte buffer */ public XByteBuffer(int size) { buf = new byte[size]; - }//XByteBuffer + } /** * Constructs a new XByteBuffer with an initial size of 1024 bytes */ public XByteBuffer() { this(DEF_SIZE); - }//XByteBuffer + } + + /** + * Create Buffer and switch compress mode (off) + * @param compress + */ + public XByteBuffer(boolean compress) { + this(DEF_SIZE); + this.compress = compress ; + } /** * Returns the bytes in the buffer, in its exact length @@ -88,7 +111,7 @@ byte[] b = new byte[bufSize]; System.arraycopy(buf,0,b,0,bufSize); return b; - }//getBytes + } /** * Resets the buffer @@ -111,7 +134,7 @@ throw new IndexOutOfBoundsException(); } else if (len == 0) { return false; - }//end if + } int newcount = bufSize + len; if (newcount > buf.length) { @@ -128,7 +151,7 @@ return false; } return true; - }//append + } /** @@ -163,9 +186,9 @@ //reset the values start = pos + END_DATA.length; pos = start + START_DATA.length; - }//while + } return cnt; - }//getSize + } /** * Method to check if a package exists in this byte buffer. @@ -173,7 +196,7 @@ */ public boolean doesPackageExist() { return (countPackages()>0); - }//doesPackageExist + } /** * Extracts the message bytes from a package. @@ -181,32 +204,43 @@ * @param clearFromBuffer - if true, the package will be removed from the byte buffer * @return - returns the actual message bytes (header, size and footer not included). */ - public byte[] extractPackage(boolean clearFromBuffer) throws java.io.IOException { + public byte[] extractPackage(boolean clearFromBuffer) + throws java.io.IOException { int psize = countPackages(); - if ( psize == 0 ) throw new java.lang.IllegalStateException("No package exists in XByteBuffer"); + if (psize == 0) + throw new java.lang.IllegalStateException( + "No package exists in XByteBuffer"); int size = toInt(buf, START_DATA.length); byte[] data = new byte[size]; - System.arraycopy(buf,START_DATA.length+4,data,0,size); - if ( clearFromBuffer ) { + System.arraycopy(buf, START_DATA.length + 4, data, 0, size); + if (clearFromBuffer) { int totalsize = START_DATA.length + 4 + size + END_DATA.length; bufSize = bufSize - totalsize; System.arraycopy(buf, totalsize, buf, 0, bufSize); } - java.io.ByteArrayInputStream bin = new java.io.ByteArrayInputStream(data); - java.util.zip.GZIPInputStream gin = new java.util.zip.GZIPInputStream(bin); - byte[] tmp = new byte[1024]; - byte[] result = new byte[0]; - int length = gin.read(tmp); - while ( length > 0 ) { - byte[] tmpdata = result; - result = new byte[result.length+length]; - System.arraycopy(tmpdata,0,result,0,tmpdata.length); - System.arraycopy(tmp,0,result,tmpdata.length,length); - length = gin.read(tmp); + byte[] result; + if (compress) { // decompress user data + // FIXME: This generate a lot of garbagge for messages larger than 1024 bytes + ByteArrayInputStream bin = + new ByteArrayInputStream(data); + GZIPInputStream gin = + new GZIPInputStream(bin); + byte[] tmp = new byte[1024]; + int length = gin.read(tmp); + result = new byte[0]; + while (length > 0) { + byte[] tmpdata = result; + result = new byte[result.length + length]; + System.arraycopy(tmpdata, 0, result, 0, tmpdata.length); + System.arraycopy(tmp, 0, result, tmpdata.length, length); + length = gin.read(tmp); + } + gin.close(); + } else { // send data direct + result = data; } - gin.close(); return result; - }//extractPackage + } /** * Convert four bytes to an int @@ -220,7 +254,7 @@ ( ( ( (int) b[off+2]) & 0xFF) << 8) + ( ( ( (int) b[off+1]) & 0xFF) << 16) + ( ( ( (int) b[off+0]) & 0xFF) << 24); - }//toInt + } /** * Convert eight bytes to a long @@ -238,8 +272,7 @@ ( ( ( (long) b[off+2]) & 0xFF) << 40) + ( ( ( (long) b[off+1]) & 0xFF) << 48) + ( ( ( (long) b[off+0]) & 0xFF) << 56); - }//toInt - + } /** * Converts an integer to four bytes @@ -256,8 +289,7 @@ n >>>= 8; b[0] = (byte) (n); return b; - } //toBytes - + } /** * Converts an long to eight bytes @@ -282,8 +314,7 @@ n >>>= 8; b[0] = (byte) (n); return b; - } //toBytes - + } /** * Similar to a String.IndexOf, but uses pure bytes @@ -308,7 +339,7 @@ if (first == src[pos]) break; pos++; - } //while + } if (pos >= srclen) return -1; @@ -326,30 +357,43 @@ return -1; //no more matches possible else pos++; - } //while + } return result; - } //firstIndexOf + } /** * Creates a complete data package * @param indata - the message data to be contained within the package + * @param compress - compress message data or not * @return - a full package (header,size,data,footer) */ - public static byte[] createDataPackage(byte[] indata) throws java.io.IOException { - java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream(indata.length/2); - java.util.zip.GZIPOutputStream gout = new java.util.zip.GZIPOutputStream(bout); - gout.write(indata); - gout.flush(); - gout.close(); - byte[] data = bout.toByteArray(); - byte[] result = new byte[START_DATA.length+4+data.length+END_DATA.length]; - System.arraycopy(START_DATA,0,result,0,START_DATA.length); - System.arraycopy(toBytes(data.length),0,result,START_DATA.length,4); - System.arraycopy(data,0,result,START_DATA.length+4,data.length); - System.arraycopy(END_DATA,0,result,START_DATA.length+4+data.length,END_DATA.length); + public static byte[] createDataPackage(byte[] indata, boolean compress) + throws java.io.IOException { + byte[] data; + if (compress) { + ByteArrayOutputStream bout = new ByteArrayOutputStream( + indata.length / 2); + GZIPOutputStream gout = new GZIPOutputStream(bout); + gout.write(indata); + gout.flush(); + gout.close(); + data = bout.toByteArray(); + } else { + data = indata; + } + byte[] result = new byte[START_DATA.length + 4 + data.length + + END_DATA.length]; + System.arraycopy(START_DATA, 0, result, 0, START_DATA.length); + System.arraycopy(toBytes(data.length), 0, result, START_DATA.length, 4); + System.arraycopy(data, 0, result, START_DATA.length + 4, data.length); + System.arraycopy(END_DATA, 0, result, START_DATA.length + 4 + + data.length, END_DATA.length); + return result; - }//createDataPackage + } + + // FIXME: extract this to test code! public static void main(String[] args) throws Exception { log.info("Before="+Integer.MAX_VALUE); byte[] d = toBytes(Integer.MAX_VALUE); @@ -364,9 +408,9 @@ d = toBytes((long)4564564); log.info("After=" + toLong(d, 0)); - byte[] d1 = createDataPackage(new byte[] {1}); - byte[] d2 = createDataPackage(new byte[] {2}); - byte[] d3 = createDataPackage(new byte[] {3}); + byte[] d1 = createDataPackage(new byte[] {1},true); + byte[] d2 = createDataPackage(new byte[] {2},true); + byte[] d3 = createDataPackage(new byte[] {3},true); byte[] test = new byte[d1.length+d2.length+d3.length+5]; System.arraycopy(d1,0,test,0,d1.length); System.arraycopy(d2,0,test,d1.length,d2.length); @@ -396,4 +440,4 @@ log.info(buf); } -}//class +}
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]