fhanik      2004/01/09 15:24:09

  Modified:    modules/cluster/src/share/org/apache/catalina/cluster/io
                        ObjectReader.java XByteBuffer.java
               modules/cluster/src/share/org/apache/catalina/cluster/session
                        SimpleTcpReplicationManager.java
               modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        IDataSenderFactory.java SimpleTcpCluster.java
                        SocketSender.java TcpReplicationThread.java
                        ThreadPool.java
  Added:       modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        PooledSocketSender.java
  Log:
  Implemented socket pool for replication since the synchronized
  send became a bottleneck. This is a dramatic performance improvement
  
  Revision  Changes    Path
  1.4       +9 -8      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java
  
  Index: ObjectReader.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- ObjectReader.java 19 Dec 2003 21:22:13 -0000      1.3
  +++ ObjectReader.java 9 Jan 2004 23:24:08 -0000       1.4
  @@ -105,6 +105,11 @@
       public int append(byte[] data,int off,int len) throws java.io.IOException {
           boolean result = false;
           buffer.append(data,off,len);
  +        int pkgCnt = buffer.countPackages();
  +        return pkgCnt;
  +    }
  +
  +    public int execute() throws java.io.IOException {
           int pkgCnt = 0;
           boolean pkgExists = buffer.doesPackageExist();
           while ( pkgExists ) {
  @@ -114,10 +119,6 @@
               pkgExists = buffer.doesPackageExist();
           }//end if
           return pkgCnt;
  -    }
  -
  -    public int execute() throws java.io.IOException {
  -        return append(new byte[0],0,0);
       }
   
       public int write(ByteBuffer buf)
  
  
  
  1.5       +66 -24    
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java
  
  Index: XByteBuffer.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- XByteBuffer.java  20 Dec 2003 00:48:52 -0000      1.4
  +++ XByteBuffer.java  9 Jan 2004 23:24:08 -0000       1.5
  @@ -180,24 +180,35 @@
        * within the buffer
        * @return - true if a complete package (header,size,data,footer) exists within 
the buffer
        */
  -    protected int packageExists()
  +    public int countPackages()
       {
  +        int cnt = 0;
           int pos = START_DATA.length;
  -        //first check start header
  -        int index = this.firstIndexOf(buf,0,START_DATA);
  -        //if the header (START_DATA) isn't the first thing or
  -        //the buffer isn't even 10 bytes
  -        if ( index != 0 || (bufSize<10) ) return 0;
  -        //then get the size 4 bytes
  -        int size = toInt(buf,pos);
  -        //now the total buffer has to be long enough to hold
  -        //START_DATA.length+4+size+END_DATA.length
  -        pos = START_DATA.length+4+size;
  -        if ( (pos+END_DATA.length) > bufSize ) return 0;
  -        //and finally check the footer of the package END_DATA
  -        int newpos = firstIndexOf(buf,pos,END_DATA);
  -        if ( newpos != pos ) return 0;
  -        return size;
  +        int start = 0;
  +
  +        while ( start < bufSize ) {
  +            //first check start header
  +            int index = this.firstIndexOf(buf,start,START_DATA);
  +            //if the header (START_DATA) isn't the first thing or
  +            //the buffer isn't even 10 bytes
  +            if ( index != start || ((bufSize-start)<10) ) break;
  +            //then get the size 4 bytes
  +            int size = toInt(buf, pos);
  +            //now the total buffer has to be long enough to hold
  +            //START_DATA.length+4+size+END_DATA.length
  +            pos = start + START_DATA.length + 4 + size;
  +            if ( (pos + END_DATA.length) > bufSize) break;
  +            //and finally check the footer of the package END_DATA
  +            int newpos = firstIndexOf(buf, pos, END_DATA);
  +            //mismatch, there is no package
  +            if (newpos != pos) break;
  +            //increase the packet count
  +            cnt++;
  +            //reset the values
  +            start = pos + END_DATA.length;
  +            pos = start + START_DATA.length;
  +        }//while
  +        return cnt;
       }//getSize
   
       /**
  @@ -205,7 +216,7 @@
        * @return - true if a complete package (header,size,data,footer) exists within 
the buffer
        */
       public boolean doesPackageExist()  {
  -        return (packageExists()>0);
  +        return (countPackages()>0);
       }//doesPackageExist
   
       /**
  @@ -215,8 +226,9 @@
        * @return - returns the actual message bytes (header, size and footer not 
included).
        */
       public byte[] extractPackage(boolean clearFromBuffer) throws 
java.io.IOException {
  -        int size = packageExists();
  -        if ( size == 0 ) throw new java.lang.IllegalStateException("No package 
exists in XByteBuffer");
  +        int psize = countPackages();
  +        if ( psize == 0 ) throw new java.lang.IllegalStateException("No package 
exists in XByteBuffer");
  +        int size = toInt(buf, START_DATA.length);
           byte[] data = new byte[size];
           System.arraycopy(buf,START_DATA.length+4,data,0,size);
           if ( clearFromBuffer ) {
  @@ -382,7 +394,7 @@
           return result;
       }//createDataPackage
   
  -    public static void main(String[] args) {
  +    public static void main(String[] args) throws Exception {
          System.out.println("Before="+Integer.MAX_VALUE);
          byte[] d = toBytes(Integer.MAX_VALUE);
          System.out.println("After="+toInt(d,0));
  @@ -395,6 +407,36 @@
          System.out.println("Before=" + 4564564);
          d = toBytes((long)4564564);
          System.out.println("After=" + toLong(d, 0));
  +
  +       byte[] d1 = createDataPackage(new byte[] {1});
  +       byte[] d2 = createDataPackage(new byte[] {2});
  +       byte[] d3 = createDataPackage(new byte[] {3});
  +       byte[] test = new byte[d1.length+d2.length+d3.length+5];
  +       System.arraycopy(d1,0,test,0,d1.length);
  +       System.arraycopy(d2,0,test,d1.length,d2.length);
  +       System.arraycopy(d3,0,test,d2.length+d1.length,d3.length);
  +       printBuf(d1);
  +       printBuf(d2);
  +       printBuf(d3);
  +       printBuf(test);
  +       XByteBuffer b = new XByteBuffer();
  +       b.append(test,0,test.length);
  +       int s = b.countPackages();
  +       System.out.println("Nr of packages="+s);
  +       while ( s > 0 ) {
  +           d = b.extractPackage(true);
  +           System.out.print("Package d1=");
  +           printBuf(d);
  +           s--;
  +       }//while
  +
  +    }
  +
  +    public static void printBuf(byte[] b) {
  +        for ( int i=0; i<b.length; i++ ) {
  +            System.out.print(b[i] + " ");
  +        }
  +        System.out.println();
       }
   
   }//class
  
  
  
  1.18      +4 -4      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java
  
  Index: SimpleTcpReplicationManager.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java,v
  retrieving revision 1.17
  retrieving revision 1.18
  diff -u -r1.17 -r1.18
  --- SimpleTcpReplicationManager.java  15 Dec 2003 21:33:06 -0000      1.17
  +++ SimpleTcpReplicationManager.java  9 Jan 2004 23:24:09 -0000       1.18
  @@ -506,7 +506,7 @@
                       reqNow = System.currentTimeMillis();
                       isTimeout=((reqNow-reqStart)>(1000*60));
                   } while ( (!isStateTransferred()) && (!isTimeout));
  -                if ( isTimeout ) {
  +                if ( isTimeout || (!isStateTransferred()) ) {
                       log.error("Manager["+getName()+"], No session state received, 
timing out.");
                   }else {
                       log.info("Manager["+getName()+"], session state received in 
"+(reqNow-reqStart)+" ms.");
  
  
  
  1.2       +20 -6     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSenderFactory.java
  
  Index: IDataSenderFactory.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSenderFactory.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- IDataSenderFactory.java   18 Apr 2003 02:51:24 -0000      1.1
  +++ IDataSenderFactory.java   9 Jan 2004 23:24:09 -0000       1.2
  @@ -69,15 +69,29 @@
       }
       public static final String SYNC_MODE="synchronous";
       public static final String ASYNC_MODE="asynchronous";
  -    public synchronized static IDataSender getIDataSender(String mode, Member mbr) 
  +    public static final String POOLED_SYNC_MODE="pooled";
  +
  +    public synchronized static IDataSender getIDataSender(String mode, Member mbr)
       throws java.io.IOException {
           if (SYNC_MODE.equals(mode) )
               return new 
SocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort());
           else if ( ASYNC_MODE.equals(mode) )
               return new 
AsyncSocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort());
  +        if (POOLED_SYNC_MODE.equals(mode) )
  +            return new 
PooledSocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort());
           else
               throw new java.io.IOException("Invalid replication mode="+mode);
       }
  -    
   
  -}
  \ No newline at end of file
  +    public static String validateMode(String mode) {
  +        if (SYNC_MODE.equals(mode) ||
  +            ASYNC_MODE.equals(mode) ||
  +            POOLED_SYNC_MODE.equals(mode) ) {
  +            return null;
  +        } else {
  +            return "Replication mode has to be '"+SYNC_MODE+"', '"+ASYNC_MODE+"' or 
'"+POOLED_SYNC_MODE+"'";
  +        }
  +    }
  +
  +
  +}
  
  
  
  1.23      +9 -10     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java
  
  Index: SimpleTcpCluster.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v
  retrieving revision 1.22
  retrieving revision 1.23
  diff -u -r1.22 -r1.23
  --- SimpleTcpCluster.java     9 Jan 2004 02:50:54 -0000       1.22
  +++ SimpleTcpCluster.java     9 Jan 2004 23:24:09 -0000       1.23
  @@ -319,12 +319,12 @@
       }
   
       public void setReplicationMode(String mode) {
  -        if ("synchronous".equals(mode) ||
  -            "asynchronous".equals(mode)) {
  +        String msg = IDataSenderFactory.validateMode(mode);
  +        if ( msg == null ) {
               log.debug("Setting replcation mode to "+mode);
               this.replicationMode = mode;
           } else
  -            throw new IllegalArgumentException("Replication mode must be either 
synchronous or asynchronous");
  +            throw new IllegalArgumentException(msg);
   
       }
       /**
  @@ -496,8 +496,8 @@
                                               this.tcpAddress,
                                               this.tcpPort,
                                               this.tcpSelectorTimeout,
  -                                            "synchronous".equals(this.
  -                    replicationMode));
  +                                            
IDataSenderFactory.SYNC_MODE.equals(replicationMode) ||
  +                                            
IDataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode));
                   mReplicationListener.setDaemon(true);
                   mReplicationListener.start();
               }
  @@ -592,7 +592,6 @@
           log.info("Received member disappeared:"+member);
           try
           {
  -            log.info("Replication member disappeared:" + member);
               Member mbr = member;
               mReplicationTransmitter.remove(InetAddress.getByName(mbr.getHost()),
                                    mbr.getPort());
  
  
  
  1.9       +12 -4     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java
  
  Index: SocketSender.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- SocketSender.java 19 Dec 2003 22:59:24 -0000      1.8
  +++ SocketSender.java 9 Jan 2004 23:24:09 -0000       1.9
  @@ -132,12 +132,20 @@
           return isSocketConnected;
       }
   
  -    private void checkIfDisconnect() {
  +    public void checkIfDisconnect() {
           long ctime = System.currentTimeMillis() - this.keepAliveConnectTime;
           if ( (ctime > this.keepAliveTimeout) ||
                (this.keepAliveCount >= this.keepAliveMaxRequestCount) ) {
               disconnect();
           }
  +    }
  +
  +    public void setAckTimeout(long timeout) {
  +        this.ackTimeout = timeout;
  +    }
  +
  +    public long getAckTimeout() {
  +        return ackTimeout;
       }
   
       /**
  
  
  
  1.7       +9 -11     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java
  
  Index: TcpReplicationThread.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- TcpReplicationThread.java 9 Jan 2004 02:50:54 -0000       1.6
  +++ TcpReplicationThread.java 9 Jan 2004 23:24:09 -0000       1.7
  @@ -169,12 +169,6 @@
           while ((count = channel.read (buffer)) > 0) {
               buffer.flip();           // make buffer readable
               int pkgcnt = reader.append(buffer.array(),0,count);
  -            while ( pkgcnt > 0 ) {
  -                if (synchronous) {
  -                    sendAck(key,channel);
  -                } //end if
  -                pkgcnt--;
  -            }
               buffer.clear();          // make buffer empty
           }
           //check to see if any data is available
  @@ -196,8 +190,12 @@
           key.selector().wakeup();
       }
   
  -    private void sendAck(SelectionKey key, SocketChannel channel) throws 
java.io.IOException {
  +    private void sendAck(SelectionKey key, SocketChannel channel) {
           //send a reply-acknowledgement
  -        channel.write(ByteBuffer.wrap(new byte[] {6,2,3}));
  +        try {
  +            channel.write(ByteBuffer.wrap(new byte[] {6, 2, 3}));
  +        } catch ( java.io.IOException x ) {
  +            log.warn("Unable to send ACK back through channel, channel 
disconnected?: "+x.getMessage());
  +        }
       }
   }
  
  
  
  1.4       +7 -7      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ThreadPool.java
  
  Index: ThreadPool.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ThreadPool.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- ThreadPool.java   20 Dec 2003 00:48:52 -0000      1.3
  +++ ThreadPool.java   9 Jan 2004 23:24:09 -0000       1.4
  @@ -107,11 +107,11 @@
       {
           WorkerThread worker = null;
   
  -        //synchronized (idle) {
  +        synchronized (idle) {
               if (idle.size() > 0) {
                   worker = (WorkerThread) idle.remove (0);
               }
  -        //}
  +        }
   
           return (worker);
       }
  @@ -122,8 +122,8 @@
        */
       void returnWorker (WorkerThread worker)
       {
  -        //synchronized (idle) {
  +        synchronized (idle) {
               idle.add (worker);
  -        //}
  +        }
       }
   }
  
  
  
  1.1                  
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java
  
  Index: PooledSocketSender.java
  ===================================================================
  /*
   * $Header: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java,v
 1.1 2004/01/09 23:24:09 fhanik Exp $
   * $Revision: 1.1 $
   * $Date: 2004/01/09 23:24:09 $
   *
   * ====================================================================
   *
   * The Apache Software License, Version 1.1
   *
   * Copyright (c) 1999 The Apache Software Foundation.  All rights
   * reserved.
   *
   * Redistribution and use in source and binary forms, with or without
   * modification, are permitted provided that the following conditions
   * are met:
   *
   * 1. Redistributions of source code must retain the above copyright
   *    notice, this list of conditions and the following disclaimer.
   *
   * 2. Redistributions in binary form must reproduce the above copyright
   *    notice, this list of conditions and the following disclaimer in
   *    the documentation and/or other materials provided with the
   *    distribution.
   *
   * 3. The end-user documentation included with the redistribution, if
   *    any, must include the following acknowlegement:
   *       "This product includes software developed by the
   *        Apache Software Foundation (http://www.apache.org/)."
   *    Alternately, this acknowlegement may appear in the software itself,
   *    if and wherever such third-party acknowlegements normally appear.
   *
   * 4. The names "The Jakarta Project", "Tomcat", and "Apache Software
   *    Foundation" must not be used to endorse or promote products derived
   *    from this software without prior written permission. For written
   *    permission, please contact [EMAIL PROTECTED]
   *
   * 5. Products derived from this software may not be called "Apache"
   *    nor may "Apache" appear in their names without prior written
   *    permission of the Apache Group.
   *
   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
   * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   * SUCH DAMAGE.
   * ====================================================================
   *
   * This software consists of voluntary contributions made by many
   * individuals on behalf of the Apache Software Foundation.  For more
   * information on the Apache Software Foundation, please see
   * <http://www.apache.org/>.
   *
   * [Additional notices, if required by prior licensing conditions]
   *
   */
  
  package org.apache.catalina.cluster.tcp;
  import java.net.InetAddress ;
  import java.net.Socket;
  import java.util.LinkedList;
  import java.util.List;
  import java.util.Collections;
  
  /**
   * <p>Title: </p>
   * <p>Description: </p>
   * <p>Copyright: Copyright (c) 2002</p>
   * <p>Company: </p>
   * @author not attributable
   * @version 1.0
   */
  
  public class PooledSocketSender implements IDataSender
  {
  
      private static org.apache.commons.logging.Log log =
          org.apache.commons.logging.LogFactory.getLog( 
org.apache.catalina.cluster.tcp.SimpleTcpCluster.class );
  
      private InetAddress address;
      private int port;
      private Socket sc = null;
      private boolean isSocketConnected = false;
      private boolean suspect;
      private long ackTimeout = 150*1000;  //15 seconds socket read timeout (for 
acknowledgement)
      private long keepAliveTimeout = 60*1000; //keep socket open for no more than one 
min
      private int keepAliveMaxRequestCount = 100; //max 100 requests before 
reconnecting
      private long keepAliveConnectTime = 0;
      private int keepAliveCount = 0;
      private int maxPoolSocketLimit = 25;
  
      private SenderQueue senderQueue = null;
  
      public PooledSocketSender(InetAddress host, int port)
      {
          this.address = host;
          this.port = port;
          senderQueue = new SenderQueue(this,maxPoolSocketLimit);
      }
  
      public InetAddress getAddress()
      {
          return address;
      }
  
      public int getPort()
      {
          return port;
      }
  
      public void connect() throws java.io.IOException
      {
          //do nothing, happens in the socket sender itself
      }
  
      public void disconnect()
      {
          senderQueue.close();
      }
  
      public boolean isConnected()
      {
          return isSocketConnected;
      }
  
      public void setAckTimeout(long timeout) {
          this.ackTimeout = timeout;
      }
  
      public long getAckTimeout() {
          return ackTimeout;
      }
  
      public void setMaxPoolSocketLimit(int limit) {
          maxPoolSocketLimit = limit;
      }
  
      public int getMaxPoolSocketLimit() {
          return maxPoolSocketLimit;
      }
  
  
      /**
       * Blocking send
       * @param data
       * @throws java.io.IOException
       */
      public void sendMessage(String sessionId, byte[] data) throws java.io.IOException
      {
          //get a socket sender from the pool
          SocketSender sender = senderQueue.getSender(0);
          //send the message
          sender.sendMessage(sessionId,data);
          //return the connection to the pool
          senderQueue.returnSender(sender);
      }
  
      public String toString() {
          StringBuffer buf = new StringBuffer("PooledSocketSender[");
          buf.append(getAddress()).append(":").append(getPort()).append("]");
          return buf.toString();
      }
  
      public boolean getSuspect() {
          return suspect;
      }
  
      public void setSuspect(boolean suspect) {
          this.suspect = suspect;
      }
  
      public long getKeepAliveTimeout() {
          return keepAliveTimeout;
      }
      public void setKeepAliveTimeout(long keepAliveTimeout) {
          this.keepAliveTimeout = keepAliveTimeout;
      }
      public int getKeepAliveMaxRequestCount() {
          return keepAliveMaxRequestCount;
      }
      public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) {
          this.keepAliveMaxRequestCount = keepAliveMaxRequestCount;
      }
  
      private class SenderQueue {
          private int limit = 25;
          PooledSocketSender parent = null;
          private LinkedList queue = new LinkedList();
          private LinkedList inuse = new LinkedList();
          private Object mutex = new Object();
  
          public SenderQueue(PooledSocketSender parent, int limit) {
              this.limit = limit;
              this.parent = parent;
          }
  
          public SocketSender getSender(long timeout) {
              SocketSender sender = null;
              long start = System.currentTimeMillis();
              long delta = 0;
              do {
                  synchronized (mutex) {
  
                      if ( queue.size() > 0 ) {
                          sender = (SocketSender) queue.removeFirst();
                      } else if ( inuse.size() < limit ) {
                          sender = getNewSocketSender();
                      } else {
                          try {
                              mutex.wait(timeout);
                          }catch ( Exception x ) {
                              parent.log.warn("PoolSocketSender.senderQueue.getSender 
failed",x);
                          }//catch
                      }//end if
                      if ( sender != null ) {
                          inuse.add(sender);
                      }
                  }//synchronized
                  delta = System.currentTimeMillis() - start;
              } while ( (sender == null) && (timeout==0?true:(delta<timeout)) );
              //to do
              return sender;
          }
  
          public void returnSender(SocketSender sender) {
              //to do
              synchronized (mutex) {
                  queue.add(sender);
                  inuse.remove(sender);
                  mutex.notify();
              }
          }
  
          private SocketSender getNewSocketSender() {
              //new SocketSender(
              SocketSender sender = new 
SocketSender(parent.getAddress(),parent.getPort());
              sender.setKeepAliveMaxRequestCount(parent.getKeepAliveMaxRequestCount());
              sender.setKeepAliveTimeout(parent.getKeepAliveTimeout());
              sender.setAckTimeout(parent.getAckTimeout());
              return sender;
  
          }
  
          public void close() {
              synchronized (mutex) {
                  for ( int i=0; i<queue.size(); i++ ) {
                      SocketSender sender = (SocketSender)queue.get(i);
                      sender.disconnect();
                  }//for
                  for ( int i=0; i<inuse.size(); i++ ) {
                      SocketSender sender = (SocketSender) inuse.get(i);
                      sender.disconnect();
                  }//for
                  queue.clear();
                  inuse.clear();
              }
          }
      }
  }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to