Author: fhanik
Date: Tue Mar 14 12:15:19 2006
New Revision: 385872
URL: http://svn.apache.org/viewcvs?rev=385872&view=rev
Log:
Worked on the pool, best to keep using the thread until it is done in regular
BIO style.
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java?rev=385872&r1=385871&r2=385872&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
Tue Mar 14 12:15:19 2006
@@ -101,9 +101,13 @@
queue.setLimit(poolSize);
}
- public void setSuspect(Boolean suspect) {
+ public void setSuspect(boolean suspect) {
this.suspect = suspect;
}
+
+ public boolean getSuspect() {
+ return suspect;
+ }
public boolean isConnected() {
return connected;
@@ -244,15 +248,4 @@
notify();
}
}
-
- public static void printArr(Object[] arr) {
- System.out.print("[");
- for (int i=0; i<arr.length; i++ ) {
- System.out.print(arr[i]);
- if ( (i+1)<arr.length )System.out.print(", ");
- }
- System.out.println("]");
- }
-
-
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java?rev=385872&r1=385871&r2=385872&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
Tue Mar 14 12:15:19 2006
@@ -180,7 +180,7 @@
InetSocketAddress addr = new InetSocketAddress(getBind(),
portstart);
socket.bind(addr);
setTcpListenPort(portstart);
- log.info("Nio Server Socket bound to:"+addr);
+ log.info("Receiver Server Socket bound to:"+addr);
return 0;
}catch ( IOException x) {
retries--;
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java?rev=385872&r1=385871&r2=385872&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
Tue Mar 14 12:15:19 2006
@@ -16,18 +16,14 @@
package org.apache.catalina.tribes.tcp.bio;
import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
import org.apache.catalina.tribes.ChannelReceiver;
-import org.apache.catalina.tribes.MessageListener;
-import java.net.InetAddress;
-import org.apache.catalina.tribes.tcp.nio.ThreadPool;
-import java.net.ServerSocket;
-import java.net.InetSocketAddress;
import org.apache.catalina.tribes.io.ListenCallback;
-import org.apache.catalina.tribes.ChannelMessage;
-import java.net.Socket;
import org.apache.catalina.tribes.io.ObjectReader;
import org.apache.catalina.tribes.tcp.ReceiverBase;
+import org.apache.catalina.tribes.tcp.nio.ThreadPool;
/**
* <p>Title: </p>
@@ -113,7 +109,6 @@
} catch (Exception x) {
log.error("Unable to run replication listener.", x);
}
-
}
public void listen() throws Exception {
@@ -125,6 +120,12 @@
while ( doListen ) {
Socket socket = null;
+ if ( pool.available() < 1 ) {
+ if ( log.isWarnEnabled() )
+ log.warn("All BIO server replication threads are busy,
unable to handle more requests until a thread is freed up.");
+ }
+ TcpReplicationThread thread =
(TcpReplicationThread)pool.getWorker();
+ if ( thread == null ) continue; //should never happen
try {
socket = serverSocket.accept();
}catch ( Exception x ) {
@@ -134,18 +135,8 @@
if ( socket == null ) continue;
socket.setReceiveBufferSize(rxBufSize);
socket.setSendBufferSize(txBufSize);
- TcpReplicationThread thread =
(TcpReplicationThread)pool.getWorker();
ObjectReader reader = new ObjectReader(socket,this);
-
- if ( thread == null ) {
- //we are out of workers, process the request on the listening
thread
- thread = getReplicationThread();
- thread.socket = socket;
- thread.reader = reader;
- thread.run();
- } else {
- thread.serviceSocket(socket,reader);
- }//end if
+ thread.serviceSocket(socket,reader);
}//while
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java?rev=385872&r1=385871&r2=385872&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
Tue Mar 14 12:15:19 2006
@@ -348,7 +348,35 @@
* ChannelMessage)
*/
public void sendMessage(byte[] data) throws IOException {
- pushMessage(data);
+ boolean messageTransfered = false ;
+ IOException exception = null;
+ try {
+ // first try with existing connection
+ pushMessage(data,false);
+ messageTransfered = true ;
+ } catch (IOException x) {
+ exception = x;
+ //resend
+ if (log.isTraceEnabled())
log.trace(sm.getString("IDataSender.send.again", address.getHostAddress(),new
Integer(port)),x);
+ try {
+ // second try with fresh connection
+ pushMessage(data,true);
+ messageTransfered = true;
+ exception = null;
+ } catch (IOException xx) {
+ exception = xx;
+ closeSocket();
+ }
+ } finally {
+ this.requestCount++;
+ keepalive();
+ if(messageTransfered) {
+
+ } else {
+ if ( exception != null ) throw exception;
+ }
+ }
+
}
@@ -444,37 +472,6 @@
writeData(data);
}
- protected void pushMessage( byte[] data) throws IOException {
- boolean messageTransfered = false ;
- IOException exception = null;
- try {
- // first try with existing connection
- pushMessage(data,false);
- messageTransfered = true ;
- } catch (IOException x) {
- exception = x;
- //resend
- if (log.isTraceEnabled())
log.trace(sm.getString("IDataSender.send.again", address.getHostAddress(),new
Integer(port)),x);
- try {
- // second try with fresh connection
- pushMessage(data,true);
- messageTransfered = true;
- exception = null;
- } catch (IOException xx) {
- exception = xx;
- closeSocket();
- }
- } finally {
- this.requestCount++;
- keepalive();
- if(messageTransfered) {
-
- } else {
- if ( exception != null ) throw exception;
- }
- }
- }
-
/**
* Sent real cluster Message to socket stream
* FIXME send compress
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java?rev=385872&r1=385871&r2=385872&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java
Tue Mar 14 12:15:19 2006
@@ -22,13 +22,9 @@
public class PooledMultiSender extends PooledSender {
protected long timeout = 15000;
- protected boolean waitForAck = false;
protected int retryAttempts=0;
protected int keepAliveCount = Integer.MAX_VALUE;
protected boolean directBuf = false;
- protected int rxBufSize = 43800;
- protected int txBufSize = 25188;
- protected boolean suspect = false;
private boolean autoConnect;
private boolean useDirectBuffer;
@@ -63,10 +59,10 @@
MultipointBioSender sender = new MultipointBioSender();
sender.setAutoConnect(autoConnect);
sender.setTimeout(timeout);
- sender.setWaitForAck(waitForAck);
+ sender.setWaitForAck(getWaitForAck());
sender.setMaxRetryAttempts(retryAttempts);
- sender.setRxBufSize(rxBufSize);
- sender.setTxBufSize(txBufSize);
+ sender.setRxBufSize(getRxBufSize());
+ sender.setTxBufSize(getTxBufSize());
return sender;
}
@@ -86,16 +82,9 @@
this.retryAttempts = retryAttempts;
}
- public void setSuspect(boolean suspect) {
- this.suspect = suspect;
- }
public void setUseDirectBuffer(boolean useDirectBuffer) {
this.useDirectBuffer = useDirectBuffer;
- }
-
- public boolean getSuspect() {
- return suspect;
}
public boolean isUseDirectBuffer() {
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java?rev=385872&r1=385871&r2=385872&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
Tue Mar 14 12:15:19 2006
@@ -23,6 +23,7 @@
import java.net.Socket;
import java.io.InputStream;
import org.apache.catalina.tribes.tcp.ReceiverBase;
+import java.io.OutputStream;
/**
* A worker thread class which can drain channels and echo-back the input. Each
@@ -72,8 +73,8 @@
} catch ( Exception x ) {
log.error("Unable to service bio socket");
}finally {
- try {reader.close();}catch ( Exception x){}
- try {socket.close();}catch ( Exception x){}
+ try {socket.close();}catch ( Exception ignore){}
+ try {reader.close();}catch ( Exception ignore){}
reader = null;
socket = null;
}
@@ -137,8 +138,7 @@
while ( length >= 0 ) {
int count = reader.append(buf,0,length,true);
if ( count > 0 ) execute(reader);
- if ( in.available() == 0 && reader.bufferSize() == 0 ) length = -1;
- else length = in.read(buf);
+ length = in.read(buf);
}
}
@@ -163,12 +163,23 @@
*/
protected void sendAck() {
try {
- this.socket.getOutputStream().write(Constants.ACK_COMMAND);
+ OutputStream out = socket.getOutputStream();
+ out.write(Constants.ACK_COMMAND);
+ out.flush();
if (log.isTraceEnabled()) {
log.trace("ACK sent to " + socket.getPort());
}
} catch ( java.io.IOException x ) {
log.warn("Unable to send ACK back through channel, channel
disconnected?: "+x.getMessage());
}
+ }
+
+ public void close() {
+ doRun = false;
+ try {socket.close();}catch ( Exception ignore){}
+ try {reader.close();}catch ( Exception ignore){}
+ reader = null;
+ socket = null;
+ super.close();
}
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=385872&r1=385871&r2=385872&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
Tue Mar 14 12:15:19 2006
@@ -75,6 +75,7 @@
long delta = System.currentTimeMillis() - start;
while ( (remaining>0) && (delta<timeout) ) {
remaining -= doLoop(selectTimeout,retryAttempts);
+ delta = System.currentTimeMillis() - start;
}
if ( remaining > 0 ) {
//timeout has occured
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java?rev=385872&r1=385871&r2=385872&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java
Tue Mar 14 12:15:19 2006
@@ -94,6 +94,10 @@
return (worker);
}
+
+ public int available() {
+ return idle.size();
+ }
/**
* Called by the worker thread to return itself to the
Modified:
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=385872&r1=385871&r2=385872&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
Tue Mar 14 12:15:19 2006
@@ -26,11 +26,9 @@
import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
import org.apache.catalina.tribes.mcast.McastService;
import org.apache.catalina.tribes.tcp.MultiPointSender;
+import org.apache.catalina.tribes.tcp.ReceiverBase;
import org.apache.catalina.tribes.tcp.ReplicationTransmitter;
-import org.apache.catalina.tribes.tcp.nio.NioReceiver;
-import org.apache.catalina.tribes.tcp.bio.BioReceiver;
import org.apache.tomcat.util.IntrospectionUtils;
-import org.apache.catalina.tribes.tcp.ReceiverBase;
/**
* <p>Title: </p>
@@ -56,7 +54,7 @@
.append("\n\t\t[-autoconnect true|false]")
.append("\n\t\t[-sync true|false]")
.append("\n\t\t[-receiver
org.apache.catalina.tribes.tcp.nio.NioReceiver|org.apache.catalina.tribes.tcp.bio.BioReceiver|]")
- .append("\n\t\t[-transport
org.apache.catalina.tribes.tcp.nio.PooledParallelSender|org.apache.catalina.tribes.tcp.bio.PooledMultipointSender]")
+ .append("\n\t\t[-transport
org.apache.catalina.tribes.tcp.nio.PooledParallelSender|org.apache.catalina.tribes.tcp.bio.PooledMultiSender]")
.append("\n\t\t[-transport.xxx transport specific property]")
.append("\n\t\t[-maddr multicastaddr]")
.append("\n\t\t[-mport multicastport]")
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]