Author: fhanik
Date: Sat Jul 1 16:16:04 2006
New Revision: 418517
URL: http://svn.apache.org/viewvc?rev=418517&view=rev
Log:
Fixed threading hand off, now works correctly
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java?rev=418517&r1=418516&r2=418517&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java
Sat Jul 1 16:16:04 2006
@@ -62,11 +62,14 @@
}
protected void setupThread(WorkerThread thread) {
- thread.setPool(this);
- thread.setName (thread.getClass().getName()+"[" + inc()+"]");
- thread.setDaemon(true);
- thread.setPriority(Thread.MAX_PRIORITY);
- thread.start();
+ synchronized (thread) {
+ thread.setPool(this);
+ thread.setName(thread.getClass().getName() + "[" + inc() + "]");
+ thread.setDaemon(true);
+ thread.setPriority(Thread.MAX_PRIORITY);
+ thread.start();
+ try {thread.wait(500); }catch ( InterruptedException x ) {}
+ }
}
/**
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java?rev=418517&r1=418516&r2=418517&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
Sat Jul 1 16:16:04 2006
@@ -58,6 +58,7 @@
// loop forever waiting for work to do
public synchronized void run()
{
+ this.notify();
while (isDoRun()) {
try {
// sleep and release object lock
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=418517&r1=418516&r2=418517&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
Sat Jul 1 16:16:04 2006
@@ -59,6 +59,7 @@
// loop forever waiting for work to do
public synchronized void run()
{
+ this.notify();
if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) {
buffer = ByteBuffer.allocateDirect(getRxBufSize());
}else {
@@ -77,8 +78,11 @@
if (key == null) {
continue; // just in case
}
+ ObjectReader reader = (ObjectReader)key.attachment();
try {
- drainChannel (key);
+ reader.setLastAccess(System.currentTimeMillis());
+ reader.access();
+ drainChannel (key,reader);
} catch (Exception e) {
//this is common, since the sockets on the other
//end expire after a certain time.
@@ -86,13 +90,15 @@
//do nothing
} else if ( e instanceof IOException ) {
//dont spew out stack traces for IO exceptions unless
debug is enabled.
- if (log.isDebugEnabled()) log.debug ("IOException in
replication worker, unable to drain channel. Probable cause: Keep alive socket
closed.", e);
- else log.warn ("IOException in replication worker, unable
to drain channel. Probable cause: Keep alive socket closed.");
+ if (log.isDebugEnabled()) log.debug ("IOException in
replication worker, unable to drain channel. Probable cause: Keep alive socket
closed["+e.getMessage()+"].", e);
+ else log.warn ("IOException in replication worker, unable
to drain channel. Probable cause: Keep alive socket
closed["+e.getMessage()+"].");
} else if ( log.isErrorEnabled() ) {
//this is a real error, log it.
log.error("Exception caught in
TcpReplicationThread.drainChannel.",e);
}
cancelKey(key);
+ } finally {
+ reader.finish();
}
key = null;
// done, ready for more, return to pool
@@ -126,59 +132,51 @@
* re-enables OP_READ and calls wakeup() on the selector
* so the selector will resume watching this channel.
*/
- protected void drainChannel (final SelectionKey key) throws Exception {
+ protected void drainChannel (final SelectionKey key, ObjectReader reader)
throws Exception {
SocketChannel channel = (SocketChannel) key.channel();
int count;
buffer.clear(); // make buffer empty
- ObjectReader reader = (ObjectReader)key.attachment();
- reader.setLastAccess(System.currentTimeMillis());
- try {
- reader.access();
- // loop while data available, channel is non-blocking
- while ((count = channel.read (buffer)) > 0) {
- buffer.flip(); // make buffer readable
- if ( buffer.hasArray() )
- reader.append(buffer.array(),0,count,false);
- else
- reader.append(buffer,count,false);
- buffer.clear(); // make buffer empty
- }
+ // loop while data available, channel is non-blocking
+ while ((count = channel.read (buffer)) > 0) {
+ buffer.flip(); // make buffer readable
+ if ( buffer.hasArray() )
+ reader.append(buffer.array(),0,count,false);
+ else
+ reader.append(buffer,count,false);
+ buffer.clear(); // make buffer empty
+ }
- int pkgcnt = reader.count();
+ int pkgcnt = reader.count();
- if ( pkgcnt > 0 ) {
- ChannelMessage[] msgs = reader.execute();
- for ( int i=0; i<msgs.length; i++ ) {
+ if ( pkgcnt > 0 ) {
+ ChannelMessage[] msgs = reader.execute();
+ for ( int i=0; i<msgs.length; i++ ) {
+ /**
+ * Use send ack here if you want to ack the request to the
remote
+ * server before completing the request
+ * This is considered an asynchronized request
+ */
+ if (ChannelData.sendAckAsync(msgs[i].getOptions()))
sendAck(key,channel,Constants.ACK_COMMAND);
+ try {
+ //process the message
+ getCallback().messageDataReceived(msgs[i]);
/**
- * Use send ack here if you want to ack the request to the
remote
- * server before completing the request
- * This is considered an asynchronized request
+ * Use send ack here if you want the request to complete
on this
+ * server before sending the ack to the remote server
+ * This is considered a synchronized request
*/
- if (ChannelData.sendAckAsync(msgs[i].getOptions()))
sendAck(key,channel,Constants.ACK_COMMAND);
- try {
- //process the message
- getCallback().messageDataReceived(msgs[i]);
- /**
- * Use send ack here if you want the request to
complete on this
- * server before sending the ack to the remote server
- * This is considered a synchronized request
- */
- if (ChannelData.sendAckSync(msgs[i].getOptions()))
sendAck(key,channel,Constants.ACK_COMMAND);
- }catch ( Exception e ) {
- log.error("Processing of cluster message failed.",e);
- if (ChannelData.sendAckSync(msgs[i].getOptions()))
sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
- }
- if ( getUseBufferPool() ) {
-
BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
- msgs[i].setMessage(null);
- }
- }
- }
- } finally {
- reader.finish();
+ if (ChannelData.sendAckSync(msgs[i].getOptions()))
sendAck(key,channel,Constants.ACK_COMMAND);
+ }catch ( Exception e ) {
+ log.error("Processing of cluster message failed.",e);
+ if (ChannelData.sendAckSync(msgs[i].getOptions()))
sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
+ }
+ if ( getUseBufferPool() ) {
+
BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
+ msgs[i].setMessage(null);
+ }
+ }
}
-
if (count < 0) {
// close channel on EOF, invalidates the key
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]