pero        2005/03/25 14:21:27

  Modified:    modules/cluster/src/share/org/apache/catalina/cluster/io
                        Jdk13ObjectReader.java ObjectReader.java
                        XByteBuffer.java
  Log:
  add compress handling
  add some documentation
  
  Revision  Changes    Path
  1.5       +42 -11    
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/Jdk13ObjectReader.java
  
  Index: Jdk13ObjectReader.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/Jdk13ObjectReader.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- Jdk13ObjectReader.java    1 Jul 2004 09:44:27 -0000       1.4
  +++ Jdk13ObjectReader.java    25 Mar 2005 22:21:26 -0000      1.5
  @@ -16,6 +16,10 @@
   
   package org.apache.catalina.cluster.io;
   
  +
  +import java.net.Socket;
  +import org.apache.catalina.cluster.io.XByteBuffer;
  +
   /**
    * The object reader object is an object used in conjunction with
    * java.nio TCP messages. This object stores the message bytes in a
  @@ -26,24 +30,41 @@
    * for message encoding and decoding.
    *
    * @author Filip Hanik
  + * @author Peter Rossbach
    * @version $Revision$, $Date$
    */
  -
  -import java.net.Socket;
  -import org.apache.catalina.cluster.io.XByteBuffer;
   public class Jdk13ObjectReader
   {
       private Socket socket;
       private ListenCallback callback;
       private XByteBuffer buffer;
   
  +    /**
  +     * use this socket and callback to receive messages
  +     * @param socket listener socket
  +     * @param callback SimpleTcpCluster listener
  +     * @param compress is send message data compress or flat.
  +     */
       public Jdk13ObjectReader( Socket socket,
  -                               ListenCallback callback )  {
  +                               ListenCallback callback, boolean compress)  {
           this.socket = socket;
           this.callback = callback;
  -        this.buffer = new XByteBuffer();
  +        this.buffer = new XByteBuffer(compress);
       }
   
  +    
  +    /**
  +     * Append new bytes to buffer. 
  +     * Is message complete receiver send message to callback
  +     * @see 
org.apache.catalina.cluster.tcp.SimpleTcpCluster#messageDataReceived(byte[])
  +     * @see XByteBuffer#doesPackageExist()
  +     * @see XByteBuffer#extractPackage(boolean)
  +     * @param data new transfer buffer
  +     * @param off offset
  +     * @param len length in buffer
  +     * @return number of messages that sended to callback
  +     * @throws java.io.IOException
  +     */
       public int append(byte[] data,int off,int len) throws 
java.io.IOException {
           boolean result = false;
           buffer.append(data,off,len);
  @@ -54,22 +75,32 @@
               callback.messageDataReceived(b);
               pkgCnt++;
               pkgExists = buffer.doesPackageExist();
  -        }//end if
  +        }
           return pkgCnt;
       }
   
  +    
  +    /**
  +     * send message to callback
  +     * @see Jdk13ObjectReader#append(byte[], int, int)
  +     * @return
  +     * @throws java.io.IOException
  +     */
       public int execute() throws java.io.IOException {
           return append(new byte[0],0,0);
       }
   
  +    /**
  +     * write data to socket (ack)
  +     * @see org.apache.catalina.cluster.tcp.Jdk13ReplicationListener#sendAck
  +     * @param data
  +     * @return
  +     * @throws java.io.IOException
  +     */
       public int write(byte[] data)
          throws java.io.IOException {
          socket.getOutputStream().write(data);
          return 0;
   
       }
  -
  -
  -
  -
   }
  
  
  
  1.7       +59 -23    
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java
  
  Index: ObjectReader.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- ObjectReader.java 1 Jul 2004 09:44:27 -0000       1.6
  +++ ObjectReader.java 25 Mar 2005 22:21:26 -0000      1.7
  @@ -13,9 +13,12 @@
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
  -
   package org.apache.catalina.cluster.io;
   
  +import java.nio.ByteBuffer;
  +import java.nio.channels.Selector;
  +import java.nio.channels.SocketChannel;
  +
   /**
    * The object reader object is an object used in conjunction with
    * java.nio TCP messages. This object stores the message bytes in a
  @@ -26,59 +29,92 @@
    * for message encoding and decoding.
    *
    * @author Filip Hanik
  + * @author Peter Rossbach
    * @version $Revision$, $Date$
    */
  +public class ObjectReader {
   
  -import java.nio.channels.SocketChannel;
  -import java.nio.channels.Selector;
  -import java.nio.ByteBuffer;
  -import org.apache.catalina.cluster.io.XByteBuffer;
  -public class ObjectReader
  -{
       private SocketChannel channel;
  +
       private Selector selector;
  +
       private ListenCallback callback;
  +
       private XByteBuffer buffer;
   
  -    public ObjectReader( SocketChannel channel,
  -                         Selector selector,
  -                         ListenCallback callback )  {
  +    /**
  +     * Create XByteBuffer and store parameter
  +     * @param channel
  +     * @param selector
  +     * @param callback
  +     */
  +    public ObjectReader(SocketChannel channel, Selector selector, 
ListenCallback callback, boolean isCompressed) {
           this.channel = channel;
           this.selector = selector;
           this.callback = callback;
  -        this.buffer = new XByteBuffer();
  +        this.buffer = new XByteBuffer(isCompressed);
       }
   
  +    /**
  +     * get the current SimpleTcpCluster
  +     * @return Returns the callback.
  +     */
  +    public ListenCallback getCallback() {
  +        return callback;
  +    }
   
  -    public SocketChannel getChannel()  {
  +    /**
  +     * Get underlying NIO channel
  +     * @return
  +     */
  +    public SocketChannel getChannel() {
           return this.channel;
       }
   
  -    public int append(byte[] data,int off,int len) throws 
java.io.IOException {
  -        boolean result = false;
  +    /**
  +     * Append new bytes to buffer. 
  +     * @see XByteBuffer#countPackages()
  +     * @param data new transfer buffer
  +     * @param off offset
  +     * @param len length in buffer
  +     * @return number of messages that sended to callback
  +     * @throws java.io.IOException
  +     */
  +     public int append(byte[] data,int off,int len) throws 
java.io.IOException {
           buffer.append(data,off,len);
           int pkgCnt = buffer.countPackages();
           return pkgCnt;
       }
   
  +    /**
  +     * Send buffer to cluster listener (callback)
  +     * Is message complete receiver send message to callback
  +     * @see 
org.apache.catalina.cluster.tcp.SimpleTcpCluster#messageDataReceived(byte[])
  +     * @see XByteBuffer#doesPackageExist()
  +     * @see XByteBuffer#extractPackage()
  +     * @return number of received packages/messages
  +     * @throws java.io.IOException
  +     */
       public int execute() throws java.io.IOException {
           int pkgCnt = 0;
           boolean pkgExists = buffer.doesPackageExist();
           while ( pkgExists ) {
               byte[] b = buffer.extractPackage(true);
  -            callback.messageDataReceived(b);
  +            getCallback().messageDataReceived(b);
               pkgCnt++;
               pkgExists = buffer.doesPackageExist();
  -        }//end if
  +        }
           return pkgCnt;
       }
  -
  -    public int write(ByteBuffer buf)
  -       throws java.io.IOException {
  +    
  +    /**
  +     * Write Ack to sender
  +     * @param buf
  +     * @return
  +     * @throws java.io.IOException
  +     */
  +    public int write(ByteBuffer buf) throws java.io.IOException {
           return getChannel().write(buf);
       }
   
  -
  -
  -
   }
  
  
  
  1.11      +109 -65   
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java
  
  Index: XByteBuffer.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -r1.10 -r1.11
  --- XByteBuffer.java  17 Sep 2004 18:32:14 -0000      1.10
  +++ XByteBuffer.java  25 Mar 2005 22:21:26 -0000      1.11
  @@ -16,17 +16,29 @@
   
   package org.apache.catalina.cluster.io;
   
  +import java.io.ByteArrayInputStream;
  +import java.io.ByteArrayOutputStream;
  +import java.util.zip.GZIPInputStream;
  +import java.util.zip.GZIPOutputStream;
  +
   /**
    * The XByteBuffer provides a dual functionality.
    * One, it stores message bytes and automatically extends the byte buffer if 
needed.<BR>
    * Two, it can encode and decode packages so that they can be defined and 
identified
    * as they come in on a socket.
  + * <br/>
  + * Transfer package:
  + * <ul>
  + * <li><b>START_DATA/b> - 7 bytes - <i>FLT2002</i></li>
  + * <li><b>SIZE</b>      - 4 bytes - size of the data package</li>
  + * <li><b>DATA</b>      - should be as many bytes as the prev SIZE</li>
  + * <li><b>END_DATA</b>  - 7 bytes - <i>TLF2003</i></lI>
  + * </ul>
    *
    * @author Filip Hanik
  + * @author Peter Rossbach
    * @version $Revision$, $Date$
    */
  -
  -
   public class XByteBuffer
   {
       
  @@ -34,52 +46,63 @@
           org.apache.commons.logging.LogFactory.getLog( XByteBuffer.class );
       
       /**
  -     * This is a package header, 7 bytes
  +     * This is a package header, 7 bytes (FLT2002)
        */
       public static final byte[] START_DATA = {70,76,84,50,48,48,50};
  +    
       /**
  -     * This is the package footer, 7 bytes
  +     * This is the package footer, 7 bytes (TLF2003)
        */
       public static final byte[] END_DATA = {84,76,70,50,48,48,51};
  -    //A package looks like, always.
  -    /**
  -     * START_DATA - 7 bytes
  -     * SIZE       - 4 bytes - size of the data package
  -     * DATA       - should be as many bytes as the prev SIZE
  -     * END_DATA   - 7 bytes
  -     */
  -
  + 
       /**
        * Default size on the initial byte buffer
        */
       static final int DEF_SIZE = 1024;
  + 
       /**
        * Default size to extend the buffer with
        */
       static final int DEF_EXT  = 1024;
  +    
       /**
        * Variable to hold the data
        */
       protected byte[] buf = null;
  +    
       /**
        * Current length of data in the buffer
        */
       protected int bufSize = 0;
   
       /**
  +     * Compress/Decompress user data
  +     */
  +    protected boolean compress = true ;
  +    
  +    /**
        * Constructs a new XByteBuffer
        * @param size - the initial size of the byte buffer
        */
       public XByteBuffer(int size) {
           buf = new byte[size];
  -    }//XByteBuffer
  +    }
   
       /**
        * Constructs a new XByteBuffer with an initial size of 1024 bytes
        */
       public XByteBuffer()  {
           this(DEF_SIZE);
  -    }//XByteBuffer
  +    }
  +
  +    /**
  +     * Create Buffer and switch compress mode (off)
  +     * @param compress
  +     */
  +    public XByteBuffer(boolean compress)  {
  +        this(DEF_SIZE);
  +        this.compress = compress ;
  +    }
   
       /**
        * Returns the bytes in the buffer, in its exact length
  @@ -88,7 +111,7 @@
           byte[] b = new byte[bufSize];
           System.arraycopy(buf,0,b,0,bufSize);
           return b;
  -    }//getBytes
  +    }
   
       /**
        * Resets the buffer
  @@ -111,7 +134,7 @@
               throw new IndexOutOfBoundsException();
           } else if (len == 0) {
               return false;
  -        }//end if
  +        }
   
           int newcount = bufSize + len;
           if (newcount > buf.length) {
  @@ -128,7 +151,7 @@
               return false;
           }
           return true;
  -    }//append
  +    }
   
   
       /**
  @@ -163,9 +186,9 @@
               //reset the values
               start = pos + END_DATA.length;
               pos = start + START_DATA.length;
  -        }//while
  +        }
           return cnt;
  -    }//getSize
  +    }
   
       /**
        * Method to check if a package exists in this byte buffer.
  @@ -173,7 +196,7 @@
        */
       public boolean doesPackageExist()  {
           return (countPackages()>0);
  -    }//doesPackageExist
  +    }
   
       /**
        * Extracts the message bytes from a package.
  @@ -181,32 +204,43 @@
        * @param clearFromBuffer - if true, the package will be removed from 
the byte buffer
        * @return - returns the actual message bytes (header, size and footer 
not included).
        */
  -    public byte[] extractPackage(boolean clearFromBuffer) throws 
java.io.IOException {
  +    public byte[] extractPackage(boolean clearFromBuffer)
  +            throws java.io.IOException {
           int psize = countPackages();
  -        if ( psize == 0 ) throw new java.lang.IllegalStateException("No 
package exists in XByteBuffer");
  +        if (psize == 0)
  +            throw new java.lang.IllegalStateException(
  +                    "No package exists in XByteBuffer");
           int size = toInt(buf, START_DATA.length);
           byte[] data = new byte[size];
  -        System.arraycopy(buf,START_DATA.length+4,data,0,size);
  -        if ( clearFromBuffer ) {
  +        System.arraycopy(buf, START_DATA.length + 4, data, 0, size);
  +        if (clearFromBuffer) {
               int totalsize = START_DATA.length + 4 + size + END_DATA.length;
               bufSize = bufSize - totalsize;
               System.arraycopy(buf, totalsize, buf, 0, bufSize);
           }
  -        java.io.ByteArrayInputStream bin = new 
java.io.ByteArrayInputStream(data);
  -        java.util.zip.GZIPInputStream gin = new 
java.util.zip.GZIPInputStream(bin);
  -        byte[] tmp = new byte[1024];
  -        byte[] result = new byte[0];
  -        int length = gin.read(tmp);
  -        while ( length > 0 ) {
  -            byte[] tmpdata = result;
  -            result = new byte[result.length+length];
  -            System.arraycopy(tmpdata,0,result,0,tmpdata.length);
  -            System.arraycopy(tmp,0,result,tmpdata.length,length);
  -            length = gin.read(tmp);
  +        byte[] result;
  +        if (compress) { // decompress user data
  +            // FIXME: This generate a lot of garbagge for messages larger 
than 1024 bytes
  +            ByteArrayInputStream bin = 
  +                new ByteArrayInputStream(data);
  +            GZIPInputStream gin = 
  +                new GZIPInputStream(bin);
  +            byte[] tmp = new byte[1024];
  +            int length = gin.read(tmp);
  +            result = new byte[0];
  +            while (length > 0) {
  +                byte[] tmpdata = result;
  +                result = new byte[result.length + length];
  +                System.arraycopy(tmpdata, 0, result, 0, tmpdata.length);
  +                System.arraycopy(tmp, 0, result, tmpdata.length, length);
  +                length = gin.read(tmp);
  +            }
  +            gin.close();
  +        } else { // send data direct 
  +            result = data;
           }
  -        gin.close();
           return result;
  -    }//extractPackage
  +    }
   
       /**
        * Convert four bytes to an int
  @@ -220,7 +254,7 @@
               ( ( ( (int) b[off+2]) & 0xFF) << 8) +
               ( ( ( (int) b[off+1]) & 0xFF) << 16) +
               ( ( ( (int) b[off+0]) & 0xFF) << 24);
  -    }//toInt
  +    }
   
       /**
        * Convert eight bytes to a long
  @@ -238,8 +272,7 @@
               ( ( ( (long) b[off+2]) & 0xFF) << 40) +
               ( ( ( (long) b[off+1]) & 0xFF) << 48) +
               ( ( ( (long) b[off+0]) & 0xFF) << 56);
  -    }//toInt
  -
  +    }
   
       /**
        * Converts an integer to four bytes
  @@ -256,8 +289,7 @@
           n >>>= 8;
           b[0] = (byte) (n);
           return b;
  -    } //toBytes
  -
  +    }
   
       /**
        * Converts an long to eight bytes
  @@ -282,8 +314,7 @@
           n >>>= 8;
           b[0] = (byte) (n);
           return b;
  -    } //toBytes
  -
  +    }
   
       /**
        * Similar to a String.IndexOf, but uses pure bytes
  @@ -308,7 +339,7 @@
                   if (first == src[pos])
                       break;
                   pos++;
  -            } //while
  +            }
               if (pos >= srclen)
                   return -1;
   
  @@ -326,30 +357,43 @@
                   return -1; //no more matches possible
               else
                   pos++;
  -        } //while
  +        }
           return result;
  -    } //firstIndexOf
  +    }
   
       /**
        * Creates a complete data package
        * @param indata - the message data to be contained within the package
  +     * @param compress - compress message data or not
        * @return - a full package (header,size,data,footer)
        */
  -    public static byte[] createDataPackage(byte[] indata) throws 
java.io.IOException  {
  -        java.io.ByteArrayOutputStream bout = new 
java.io.ByteArrayOutputStream(indata.length/2);
  -        java.util.zip.GZIPOutputStream gout = new 
java.util.zip.GZIPOutputStream(bout);
  -        gout.write(indata);
  -        gout.flush();
  -        gout.close();
  -        byte[] data = bout.toByteArray();
  -        byte[] result = new 
byte[START_DATA.length+4+data.length+END_DATA.length];
  -        System.arraycopy(START_DATA,0,result,0,START_DATA.length);
  -        System.arraycopy(toBytes(data.length),0,result,START_DATA.length,4);
  -        System.arraycopy(data,0,result,START_DATA.length+4,data.length);
  -        
System.arraycopy(END_DATA,0,result,START_DATA.length+4+data.length,END_DATA.length);
  +    public static byte[] createDataPackage(byte[] indata, boolean compress)
  +            throws java.io.IOException {
  +        byte[] data;
  +        if (compress) {
  +            ByteArrayOutputStream bout = new ByteArrayOutputStream(
  +                    indata.length / 2);
  +            GZIPOutputStream gout = new GZIPOutputStream(bout);
  +            gout.write(indata);
  +            gout.flush();
  +            gout.close();
  +            data = bout.toByteArray();
  +        } else {
  +            data = indata;
  +        }
  +        byte[] result = new byte[START_DATA.length + 4 + data.length
  +                + END_DATA.length];
  +        System.arraycopy(START_DATA, 0, result, 0, START_DATA.length);
  +        System.arraycopy(toBytes(data.length), 0, result, START_DATA.length, 
4);
  +        System.arraycopy(data, 0, result, START_DATA.length + 4, 
data.length);
  +        System.arraycopy(END_DATA, 0, result, START_DATA.length + 4
  +                + data.length, END_DATA.length);
  +
           return result;
  -    }//createDataPackage
   
  +    }
  +
  +    // FIXME: extract this to test code!
       public static void main(String[] args) throws Exception {
          log.info("Before="+Integer.MAX_VALUE);
          byte[] d = toBytes(Integer.MAX_VALUE);
  @@ -364,9 +408,9 @@
          d = toBytes((long)4564564);
          log.info("After=" + toLong(d, 0));
   
  -       byte[] d1 = createDataPackage(new byte[] {1});
  -       byte[] d2 = createDataPackage(new byte[] {2});
  -       byte[] d3 = createDataPackage(new byte[] {3});
  +       byte[] d1 = createDataPackage(new byte[] {1},true);
  +       byte[] d2 = createDataPackage(new byte[] {2},true);
  +       byte[] d3 = createDataPackage(new byte[] {3},true);
          byte[] test = new byte[d1.length+d2.length+d3.length+5];
          System.arraycopy(d1,0,test,0,d1.length);
          System.arraycopy(d2,0,test,d1.length,d2.length);
  @@ -396,4 +440,4 @@
           log.info(buf);
       }
   
  -}//class
  +}
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to