Author: fhanik Date: Fri Jul 7 10:53:35 2006 New Revision: 419938 URL: http://svn.apache.org/viewvc?rev=419938&view=rev Log: Added trace and fixed remote processing exceptions
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=419938&r1=419937&r2=419938&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Fri Jul 7 10:53:35 2006 @@ -39,6 +39,8 @@ import org.apache.catalina.tribes.UniqueId; import org.apache.catalina.tribes.Heartbeat; import org.apache.catalina.tribes.io.BufferPool; +import java.io.IOException; +import org.apache.catalina.tribes.RemoteProcessException; /** * The default implementation of a Channel.<br> @@ -231,7 +233,9 @@ public void messageReceived(ChannelMessage msg) { if ( msg == null ) return; try { - + if ( log.isTraceEnabled() ) { + log.trace("GroupChannel received msg id:"+new UniqueId(msg.getUniqueId())); + } Serializable fwd = null; if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) { fwd = new ByteMessage(msg.getMessage().getBytes()); @@ -255,8 +259,13 @@ //but none was given, send back an immediate one sendNoRpcChannelReply((RpcMessage)fwd,source); } + if ( log.isTraceEnabled() ) { + log.trace("GroupChannel delivered["+rx+"] id:"+new UniqueId(msg.getUniqueId())); + } + } catch ( Exception x ) { - log.error("Unable to deserialize channel message.",x); + if ( log.isDebugEnabled() ) log.error("Unable to process channel:IOException.",x); + throw new RemoteProcessException("IOException:"+x.getMessage(),x); } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java?rev=419938&r1=419937&r2=419938&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java Fri Jul 7 10:53:35 2006 @@ -69,10 +69,12 @@ public synchronized void access() { this.accessed = true; + this.lastAccess = System.currentTimeMillis(); } public synchronized void finish() { this.accessed = false; + this.lastAccess = System.currentTimeMillis(); } public boolean isAccessed() { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=419938&r1=419937&r2=419938&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Fri Jul 7 10:53:35 2006 @@ -255,14 +255,21 @@ * be replicated */ public void replicate(Object key, boolean complete) { + if ( log.isTraceEnabled() ) + log.trace("Replicate invoked on key:"+key); MapEntry entry = (MapEntry)super.get(key); if ( !entry.isSerializable() ) return; if (entry != null && entry.isPrimary() && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0) { Object value = entry.getValue(); //check to see if we need to replicate this object isDirty()||complete boolean repl = complete || ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDirty()); - if (!repl)return; - + + if (!repl) { + if ( log.isTraceEnabled() ) + log.trace("Not replicating:"+key+", no change made"); + + return; + } //check to see if the message is diffable boolean diff = ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDiffable()); MapMessage msg = null; @@ -439,6 +446,8 @@ log.error("Unable to deserialize MapMessage.", x); return; } + if ( log.isTraceEnabled() ) + log.trace("Map message received from:"+sender.getName()+" msg:"+mapmsg); if (mapmsg.getMsgType() == MapMessage.MSG_START) { mapMemberAdded(mapmsg.getBackupNodes()[0]); } @@ -1008,6 +1017,32 @@ private byte[] keydata; private byte[] diffvalue; private Member[] nodes; + + public String toString() { + StringBuffer buf = new StringBuffer("MapMessage[context="); + buf.append(new String(mapId)); + buf.append("; type="); + buf.append(getTypeDesc()); + buf.append("; key="); + buf.append(key); + buf.append("; value="); + buf.append(value); + return buf.toString(); + } + + public String getTypeDesc() { + switch (msgtype) { + case MSG_BACKUP: return "MSG_BACKUP"; + case MSG_RETRIEVE_BACKUP: return "MSG_RETRIEVE_BACKUP"; + case MSG_PROXY: return "MSG_PROXY"; + case MSG_REMOVE: return "MSG_REMOVE"; + case MSG_STATE: return "MSG_STATE"; + case MSG_START: return "MSG_START"; + case MSG_STOP: return "MSG_STOP"; + case MSG_INIT: return "MSG_INIT"; + default : return "UNKNOWN"; + } + } public MapMessage() {} Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=419938&r1=419937&r2=419938&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Fri Jul 7 10:53:35 2006 @@ -23,6 +23,8 @@ import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.MembershipListener; import org.apache.catalina.tribes.group.RpcCallback; +import org.apache.catalina.tribes.util.Arrays; +import org.apache.catalina.tribes.UniqueId; /** * A smart implementation of a stateful replicated map. uses primary/secondary backup strategy. @@ -145,7 +147,11 @@ //publish the backup data to one node msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false, (Serializable) key, (Serializable) value, null, backup); - getChannel().send(backup, msg, getChannelSendOptions()); + if ( log.isTraceEnabled() ) + log.trace("Publishing backup data:"+msg+" to: "+next.getName()); + UniqueId id = getChannel().send(backup, msg, getChannelSendOptions()); + if ( log.isTraceEnabled() ) + log.trace("Data published:"+msg+" msg Id:"+id); //we published out to a backup, mark the test success success = true; }catch ( ChannelException x ) { @@ -157,6 +163,8 @@ if (success && proxies.length > 0 ) { msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false, (Serializable) key, null, null, backup); + if ( log.isTraceEnabled() ) + log.trace("Publishing proxy data:"+msg+" to: "+Arrays.toNameString(proxies)); getChannel().send(proxies, msg, getChannelSendOptions()); } }catch ( ChannelException x ) { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=419938&r1=419937&r2=419938&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java Fri Jul 7 10:53:35 2006 @@ -140,6 +140,7 @@ synchronized (events) { events.add(event); } + if ( log.isTraceEnabled() ) log.trace("Adding event to selector:"+event); selector.wakeup(); } } @@ -150,6 +151,7 @@ Runnable r = null; while ( (events.size() > 0) && (r = (Runnable)events.removeFirst()) != null ) { try { + if ( log.isTraceEnabled() ) log.trace("Processing event in selector:"+r); r.run(); } catch ( Exception x ) { log.error("",x); @@ -194,7 +196,7 @@ if ( ka != null ) { long delta = now - ka.getLastAccess(); if (delta > (long) getTimeout() && (!ka.isAccessed())) { - log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms. (cancelled:"+ka.isCancelled()+")"); + log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms. (cancelled:"+ka.isCancelled()+"):"+key+" last access:"+new java.sql.Timestamp(ka.getLastAccess())); // System.out.println("Interest:"+key.interestOps()); // System.out.println("Ready Ops:"+key.readyOps()); // System.out.println("Valid:"+key.isValid()); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=419938&r1=419937&r2=419938&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Fri Jul 7 10:53:35 2006 @@ -29,6 +29,7 @@ import org.apache.catalina.tribes.io.BufferPool; import java.nio.channels.CancelledKeyException; import org.apache.catalina.tribes.UniqueId; +import org.apache.catalina.tribes.RemoteProcessException; /** * A worker thread class which can drain channels and echo-back the input. Each @@ -77,11 +78,19 @@ if (key == null) { continue; // just in case } + if ( log.isTraceEnabled() ) + log.trace("Servicing key:"+key); + try { ObjectReader reader = (ObjectReader)key.attachment(); if ( reader == null ) { + if ( log.isTraceEnabled() ) + log.trace("No object reader, cancelling:"+key); cancelKey(key); } else { + if ( log.isTraceEnabled() ) + log.trace("Draining channel:"+key); + drainChannel(key, reader); } } catch (Exception e) { @@ -119,6 +128,10 @@ * worker thread is servicing it. */ public synchronized void serviceChannel (SelectionKey key) { + if ( log.isTraceEnabled() ) + log.trace("About to service key:"+key); + ObjectReader reader = (ObjectReader)key.attachment(); + if ( reader != null ) reader.setLastAccess(System.currentTimeMillis()); this.key = key; key.interestOps (key.interestOps() & (~SelectionKey.OP_READ)); key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE)); @@ -185,6 +198,9 @@ * This is considered a synchronized request */ if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND); + }catch ( RemoteProcessException e ) { + if ( log.isDebugEnabled() ) log.error("Processing of cluster message failed.",e); + if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND); }catch ( Exception e ) { log.error("Processing of cluster message failed.",e); if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND); @@ -208,6 +224,8 @@ } protected void registerForRead(final SelectionKey key, ObjectReader reader) { + if ( log.isTraceEnabled() ) + log.trace("Adding key for read event:"+key); reader.finish(); //register our OP_READ interest Runnable r = new Runnable() { @@ -219,14 +237,16 @@ // resume interest in OP_READ, OP_WRITE int resumeOps = key.interestOps() | SelectionKey.OP_READ; key.interestOps(resumeOps); + if ( log.isTraceEnabled() ) + log.trace("Registering key for read:"+key); } } catch (CancelledKeyException ckx ) { NioReceiver.cancelledKey(key); + if ( log.isTraceEnabled() ) + log.trace("CKX Cancelling key:"+key); + } catch (Exception x) { - try { - key.selector().close(); - } catch (Exception ignore) {} - log.error("Unable to cycle the selector, connection disconnected?", x); + log.error("Error registering key for read:"+key,x); } } }; @@ -234,6 +254,9 @@ } private void cancelKey(final SelectionKey key) { + if ( log.isTraceEnabled() ) + log.trace("Adding key for cancel event:"+key); + ObjectReader reader = (ObjectReader)key.attachment(); if ( reader != null ) { reader.setCancelled(true); @@ -241,6 +264,9 @@ } Runnable cx = new Runnable() { public void run() { + if ( log.isTraceEnabled() ) + log.trace("Cancelling key:"+key); + NioReceiver.cancelledKey(key); } }; --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]