Author: fhanik Date: Mon Mar 13 13:33:03 2006 New Revision: 385661 URL: http://svn.apache.org/viewcvs?rev=385661&view=rev Log: Working on simplicity, removing all complex code, synchronization should be simple, but ideally, there should be none, two threads should never try to access the same socket
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java - copied, changed from r385654, tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java Removed: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=385661&r1=385660&r2=385661&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java Mon Mar 13 13:33:03 2006 @@ -35,7 +35,7 @@ public boolean isConnected(); public void setRxBufSize(int size); public void setTxBufSize(int size); - public boolean checkKeepAlive(); + public boolean keepalive(); public void setTimeout(long timeout); public void setWaitForAck(boolean isWaitForAck); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java?rev=385661&r1=385660&r2=385661&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java Mon Mar 13 13:33:03 2006 @@ -52,7 +52,7 @@ } public void returnSender(DataSender sender) { - sender.checkKeepAlive(); + sender.keepalive(); queue.returnSender(sender); } @@ -125,7 +125,7 @@ return poolSize; } - public boolean checkKeepAlive() { + public boolean keepalive() { //do nothing, the pool checks on every return return false; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=385661&r1=385660&r2=385661&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java Mon Mar 13 13:33:03 2006 @@ -128,18 +128,8 @@ */ public void heartbeat() { - checkKeepAlive(); + } - - /** - * Check all DataSender Socket to close socket at keepAlive mode - * @see DataSender#checkKeepAlive() - */ - public void checkKeepAlive() { - getTransport().checkKeepAlive(); - } - - /** * add new cluster member and create sender ( s. replicationMode) transfer Copied: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java (from r385654, tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java) URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java?p2=tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java&p1=tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java&r1=385654&r2=385661&rev=385661&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java Mon Mar 13 13:33:03 2006 @@ -17,15 +17,12 @@ package org.apache.catalina.tribes.tcp.bio; import java.io.IOException; -import java.io.OutputStream; import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; import java.util.Arrays; import org.apache.catalina.tribes.ChannelException; -import org.apache.catalina.tribes.ChannelMessage; -import org.apache.catalina.tribes.io.ClusterData; import org.apache.catalina.tribes.io.XByteBuffer; import org.apache.catalina.tribes.tcp.Constants; import org.apache.catalina.tribes.tcp.DataSender; @@ -41,9 +38,9 @@ * @version $Revision: 377484 $ $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $ * @since 5.5.16 */ -public class SinglePointDataSender implements DataSender { +public class BioSender implements DataSender { - private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(SinglePointDataSender.class); + private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(BioSender.class); /** * The string manager for this package. @@ -69,11 +66,6 @@ /** - * cluster domain - */ - private String domain; - - /** * current sender socket */ private Socket socket = null; @@ -84,11 +76,6 @@ private boolean isSocketConnected = false; /** - * Message transfer over socket ? - */ - private boolean isMessageTransferStarted = false; - - /** * sender is in suspect state (last transfer failed) */ private SenderState senderState = new SenderState(); @@ -147,20 +134,19 @@ // ------------------------------------------------------------- Constructor - public SinglePointDataSender(String domain,InetAddress host, int port) { + public BioSender(InetAddress host, int port) { this.address = host; this.port = port; - this.domain = domain; if (log.isDebugEnabled()) log.debug(sm.getString("IDataSender.create",address, new Integer(port))); } - public SinglePointDataSender(String domain,InetAddress host, int port, SenderState state) { - this(domain,host,port); + public BioSender(InetAddress host, int port, SenderState state) { + this(host,port); if ( state != null ) this.senderState = state; } - public SinglePointDataSender(String domain,InetAddress host, int port, SenderState state, int rxBufSize, int txBufSize) { - this(domain,host,port,state); + public BioSender(InetAddress host, int port, SenderState state, int rxBufSize, int txBufSize) { + this(host,port,state); this.rxBufSize = rxBufSize; this.txBufSize = txBufSize; } @@ -173,9 +159,7 @@ * <code><description>/<version></code>. */ public String getInfo() { - return (info); - } @@ -202,39 +186,10 @@ return port; } - /** - * @return Returns the domain. - */ - public String getDomain() { - return domain; - } - - /** - * @param domain The domain to set. - */ - public void setDomain(String domain) { - this.domain = domain; - } - public boolean isConnected() { return isSocketConnected; } - /** - * @return Is DataSender send a message - */ - public boolean isMessageTransferStarted() { - return isMessageTransferStarted; - } - - /** - * @param isSocketConnected - * The isSocketConnected to set. - */ - protected void setSocketConnected(boolean isSocketConnected) { - this.isSocketConnected = isSocketConnected; - } - public boolean isSuspect() { return senderState.isSuspect() || senderState.isFailing(); } @@ -315,12 +270,6 @@ public void setResend(boolean resend) { this.resend = resend; } - /** - * @return Returns the socket. - */ - public Socket getSocket() { - return socket; - } public SenderState getSenderState() { return senderState; @@ -334,13 +283,6 @@ return txBufSize; } - /** - * @param socket The socket to set. - */ - public void setSocket(Socket socket) { - this.socket = socket; - } - public void setRxBufSize(int rxBufSize) { this.rxBufSize = rxBufSize; } @@ -356,15 +298,11 @@ * @see org.apache.catalina.tribes.tcp.IDataSender#connect() */ public synchronized void connect() throws ChannelException { - if(!isMessageTransferStarted) { + try { openSocket(); - if(isConnected()) { - if (log.isDebugEnabled()) - log.debug(sm.getString("IDataSender.connect", address.getHostAddress(),new Integer(port),new Long(0))); - } - } else - if (log.isWarnEnabled()) - log.warn(sm.getString("IDataSender.message.create", address.getHostAddress(),new Integer(port))); + }catch ( Exception x ) { + throw new ChannelException(x); + } } @@ -374,16 +312,12 @@ * @see IDataSender#disconnect() */ public synchronized void disconnect() { - if(!isMessageTransferStarted) { boolean connect = isConnected() ; closeSocket(); if(connect) { if (log.isDebugEnabled()) log.debug(sm.getString("IDataSender.disconnect", address.getHostAddress(),new Integer(port),new Long(0))); } - } else - if (log.isWarnEnabled()) - log.warn(sm.getString("IDataSender.message.disconnect", address.getHostAddress(),new Integer(port))); } @@ -395,18 +329,15 @@ * @return true, is socket close * @see DataSender#closeSocket() */ - public synchronized boolean checkKeepAlive() { + public synchronized boolean keepalive() { boolean isCloseSocket = true ; - if(!isMessageTransferStarted) { - if(isConnected()) { - if ((keepAliveTimeout > -1 && (System.currentTimeMillis() - keepAliveConnectTime) > keepAliveTimeout) - || (keepAliveMaxRequestCount > -1 && keepAliveCount >= keepAliveMaxRequestCount)) { - closeSocket(); - } else - isCloseSocket = false ; - } - } else - isCloseSocket = false ; + if(isConnected()) { + if ((keepAliveTimeout > -1 && (System.currentTimeMillis() - keepAliveConnectTime) > keepAliveTimeout) + || (keepAliveMaxRequestCount > -1 && keepAliveCount >= keepAliveMaxRequestCount)) { + closeSocket(); + } else + isCloseSocket = false ; + } return isCloseSocket; } @@ -417,14 +348,8 @@ * @see org.apache.catalina.tribes.tcp.IDataSender#sendMessage(, * ChannelMessage) */ - public synchronized void sendMessage(ChannelMessage data) throws ChannelException { - try { - pushMessage(data); - }catch ( Exception x ) { - ChannelException cx = new ChannelException(x); - cx.addFaultyMember(data.getAddress()); - throw cx; - } + public synchronized void sendMessage(byte[] data) throws IOException { + pushMessage(data); } @@ -444,7 +369,7 @@ * open real socket and set time out when waitForAck is enabled * is socket open return directly */ - protected void openSocket() throws ChannelException { + protected synchronized void openSocket() throws IOException { if(isConnected()) return ; try { @@ -459,7 +384,7 @@ getSenderState().setSuspect(); if (log.isDebugEnabled()) log.debug(sm.getString("IDataSender.openSocket.failure",address.getHostAddress(), new Integer(port),new Long(0)), ex1); - throw new ChannelException(ex1); + throw (ex1); } } @@ -468,7 +393,7 @@ * @throws IOException * @throws SocketException */ - protected void createSocket() throws IOException, SocketException { + protected synchronized void createSocket() throws IOException, SocketException { socket = new Socket(getAddress(), getPort()); socket.setSendBufferSize(getTxBufSize()); socket.setReceiveBufferSize(getRxBufSize()); @@ -481,7 +406,7 @@ * @see DataSender#disconnect() * @see DataSender#closeSocket() */ - protected void closeSocket() { + protected synchronized void closeSocket() { if(isConnected()) { if (socket != null) { try { @@ -516,28 +441,22 @@ * @since 5.5.10 */ - protected void pushMessage(ChannelMessage data, boolean reconnect) throws ChannelException { - synchronized(this) { - checkKeepAlive(); - if ( reconnect ) closeSocket(); - if (!isConnected()) openSocket(); - else if(keepAliveTimeout > -1) this.keepAliveConnectTime = System.currentTimeMillis(); - } - try { - writeData(data); - } catch ( IOException x ) { - throw new ChannelException(x); - } + protected synchronized void pushMessage(byte[] data, boolean reconnect) throws IOException { + keepalive(); + if ( reconnect ) closeSocket(); + if (!isConnected()) openSocket(); + else if(keepAliveTimeout > -1) this.keepAliveConnectTime = System.currentTimeMillis(); + writeData(data); } - protected void pushMessage( ChannelMessage data) throws ChannelException { + protected synchronized void pushMessage( byte[] data) throws IOException { boolean messageTransfered = false ; - ChannelException exception = null; + IOException exception = null; try { // first try with existing connection pushMessage(data,false); messageTransfered = true ; - } catch (ChannelException x) { + } catch (IOException x) { exception = x; //resend if (log.isTraceEnabled()) log.trace(sm.getString("IDataSender.send.again", address.getHostAddress(),new Integer(port)),x); @@ -546,17 +465,17 @@ pushMessage(data,true); messageTransfered = true; exception = null; - } catch (ChannelException xx) { + } catch (IOException xx) { exception = xx; closeSocket(); } } finally { this.keepAliveCount++; - checkKeepAlive(); + keepalive(); if(messageTransfered) { - if (log.isTraceEnabled()) log.trace(sm.getString("IDataSender.send.message", address.getHostAddress(),new Integer(port), data.getUniqueId(), new Long(data.getMessage().getLength()))); + } else { - if ( exception != null ) throw new ChannelException(exception); + if ( exception != null ) throw exception; } } } @@ -568,19 +487,10 @@ * @throws IOException * @since 5.5.10 */ - protected void writeData(ChannelMessage data) throws IOException { - synchronized(this) { - isMessageTransferStarted = true ; - } - try { - socket.getOutputStream().write(XByteBuffer.createDataPackage((ClusterData)data)); - socket.getOutputStream().flush(); - if (getWaitForAck()) waitForAck(); - } finally { - synchronized(this) { - isMessageTransferStarted = false ; - } - } + protected synchronized void writeData(byte[] data) throws IOException { + socket.getOutputStream().write(data); + socket.getOutputStream().flush(); + if (getWaitForAck()) waitForAck(); } /** Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java?rev=385661&r1=385660&r2=385661&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java Mon Mar 13 13:33:03 2006 @@ -23,6 +23,7 @@ import org.apache.catalina.tribes.tcp.DataSender; import org.apache.catalina.tribes.tcp.PooledSender; import org.apache.catalina.tribes.tcp.SenderState; +import java.io.IOException; /** * Send cluster messages with a pool of sockets (25). @@ -158,14 +159,14 @@ * @param data Message data * @throws java.io.IOException */ - public void sendMessage(ChannelMessage data) throws ChannelException { + public void sendMessage(byte[] data) throws IOException, ChannelException { //get a socket sender from the pool if(!isConnected()) { synchronized(this) { if(!isConnected()) connect(); } } - SinglePointDataSender sender = (SinglePointDataSender)getSender(); + BioSender sender = (BioSender)getSender(); if (sender == null) { log.warn("Sender queue is empty. Can not send any messages."); return; @@ -187,7 +188,7 @@ public DataSender getNewDataSender() { //new DataSender( - SinglePointDataSender sender = new SinglePointDataSender(getDomain(), + BioSender sender = new BioSender( getHost(), getPort(), getSenderState() ); Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java?rev=385661&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java Mon Mar 13 13:33:03 2006 @@ -0,0 +1,194 @@ +package org.apache.catalina.tribes.tcp.bio; + +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.tcp.MultiPointSender; +import org.apache.catalina.tribes.io.XByteBuffer; +import java.nio.channels.Selector; +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.util.Map; +import java.util.HashMap; +import org.apache.catalina.tribes.tcp.nio.NioSender; +import java.util.Iterator; +import org.apache.catalina.tribes.io.ClusterData; +import java.net.InetAddress; +import org.apache.catalina.tribes.tcp.SenderState; + +/** + * <p>Title: </p> + * + * <p>Description: </p> + * + * <p>Copyright: Copyright (c) 2005</p> + * + * <p>Company: </p> + * + * @author not attributable + * @version 1.0 + */ +public class MultipointBioSender implements MultiPointSender { + public MultipointBioSender() { + } + + protected long timeout = 15000; + protected long selectTimeout = 1000; + protected boolean waitForAck = false; + protected int retryAttempts=0; + protected int keepAliveCount = Integer.MAX_VALUE; + protected HashMap bioSenders = new HashMap(); + protected boolean directBuf = false; + protected int rxBufSize = 43800; + protected int txBufSize = 25188; + protected boolean suspect = false; + private boolean connected; + private boolean autoConnect; + + public synchronized void sendMessage(Member[] destination, ChannelMessage msg) throws ChannelException { + long start = System.currentTimeMillis(); + byte[] data = XByteBuffer.createDataPackage((ClusterData)msg); + BioSender[] senders = setupForSend(destination); + ChannelException cx = null; + for ( int i=0; i<senders.length; i++ ) { + try { + senders[i].sendMessage(data); + } catch (Exception x) { + if (cx == null) cx = new ChannelException(x); + cx.addFaultyMember(destination[i]); + } + } + if (cx!=null ) throw cx; + } + + + + private BioSender[] setupForSend(Member[] destination) throws ChannelException { + ChannelException cx = null; + BioSender[] result = new BioSender[destination.length]; + for ( int i=0; i<destination.length; i++ ) { + try { + BioSender sender = (BioSender) bioSenders.get(destination[i]); + if (sender == null) { + InetAddress dest = InetAddress.getByAddress(destination[i].getHost()); + sender = new BioSender(dest, destination[i].getPort(), new SenderState(), rxBufSize, txBufSize); + bioSenders.put(destination[i], sender); + } + sender.setWaitForAck(waitForAck); + result[i] = sender; + if (!result[i].isConnected() ) result[i].connect(); + result[i].keepalive(); + }catch (Exception x ) { + if ( cx== null ) cx = new ChannelException(x); + cx.addFaultyMember(destination[i]); + } + } + if ( cx!=null ) throw cx; + else return result; + } + + public void connect() { + //do nothing, we connect on demand + setConnected(true); + } + + + private synchronized void close() throws ChannelException { + ChannelException x = null; + Object[] members = bioSenders.keySet().toArray(); + for (int i=0; i<members.length; i++ ) { + Member mbr = (Member)members[i]; + try { + NioSender sender = (NioSender)bioSenders.get(mbr); + sender.disconnect(); + }catch ( Exception e ) { + if ( x == null ) x = new ChannelException(e); + x.addFaultyMember(mbr); + } + bioSenders.remove(mbr); + } + if ( x != null ) throw x; + } + + public void memberAdded(Member member) { + + } + + public void memberRemoved(Member member) { + //disconnect senders + NioSender sender = (NioSender)bioSenders.remove(member); + if ( sender != null ) sender.disconnect(); + } + + + public synchronized void disconnect() { + try {close(); }catch (Exception x){} + setConnected(false); + } + + public void finalize() { + try {disconnect(); }catch ( Exception ignore){} + } + + public boolean getSuspect() { + return suspect; + } + + public boolean isConnected() { + return connected; + } + + public boolean isAutoConnect() { + return autoConnect; + } + + public void setSuspect(boolean suspect) { + this.suspect = suspect; + } + + public void setUseDirectBuffer(boolean directBuf) { + this.directBuf = directBuf; + } + + public void setMaxRetryAttempts(int attempts) { + this.retryAttempts = attempts; + } + + public void setTxBufSize(int size) { + this.txBufSize = size; + } + + public void setRxBufSize(int size) { + this.rxBufSize = size; + } + + public void setWaitForAck(boolean wait) { + this.waitForAck = wait; + } + + public void setTimeout(long timeout) { + this.timeout = timeout; + } + + public void setConnected(boolean connected) { + this.connected = connected; + } + + public void setAutoConnect(boolean autoConnect) { + this.autoConnect = autoConnect; + } + + public boolean keepalive() { + //throw new UnsupportedOperationException("Method ParallelNioSender.checkKeepAlive() not implemented"); + boolean result = false; + Map.Entry[] entries = (Map.Entry[])bioSenders.entrySet().toArray(new Map.Entry[bioSenders.size()]); + for ( int i=0; i<entries.length; i++ ) { + NioSender sender = (NioSender)entries[i].getValue(); + if ( sender.checkKeepAlive() ) { + bioSenders.remove(sender.getDestination()); + } + } + return result; + } + +} \ No newline at end of file Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=385661&r1=385660&r2=385661&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java Mon Mar 13 13:33:03 2006 @@ -273,7 +273,7 @@ this.autoConnect = autoConnect; } - public boolean checkKeepAlive() { + public boolean keepalive() { //throw new UnsupportedOperationException("Method ParallelNioSender.checkKeepAlive() not implemented"); boolean result = false; Map.Entry[] entries = (Map.Entry[])nioSenders.entrySet().toArray(new Map.Entry[nioSenders.size()]); --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]