This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch 10.1.x in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/10.1.x by this push: new fa0e97e9fd Code clean-up - formatting. No functional change fa0e97e9fd is described below commit fa0e97e9fdc62fd90320869e9fa51c40dead3d0c Author: Mark Thomas <ma...@apache.org> AuthorDate: Fri May 10 14:59:18 2024 +0100 Code clean-up - formatting. No functional change --- .../catalina/tribes/transport/AbstractRxTask.java | 3 +- .../catalina/tribes/transport/AbstractSender.java | 15 +- .../catalina/tribes/transport/Constants.java | 8 +- .../catalina/tribes/transport/DataSender.java | 7 + .../tribes/transport/MultiPointSender.java | 9 +- .../catalina/tribes/transport/PooledSender.java | 51 ++--- .../catalina/tribes/transport/ReceiverBase.java | 66 ++++--- .../tribes/transport/ReplicationTransmitter.java | 11 +- .../catalina/tribes/transport/RxTaskPool.java | 48 ++--- .../catalina/tribes/transport/SenderState.java | 6 +- .../catalina/tribes/transport/nio/NioReceiver.java | 196 +++++++++---------- .../tribes/transport/nio/NioReplicationTask.java | 212 ++++++++++----------- .../catalina/tribes/transport/nio/NioSender.java | 195 +++++++++---------- .../tribes/transport/nio/ParallelNioSender.java | 141 +++++++------- .../tribes/transport/nio/PooledParallelSender.java | 16 +- 15 files changed, 502 insertions(+), 482 deletions(-) diff --git a/java/org/apache/catalina/tribes/transport/AbstractRxTask.java b/java/org/apache/catalina/tribes/transport/AbstractRxTask.java index 2a95f7dd53..2d17ae166b 100644 --- a/java/org/apache/catalina/tribes/transport/AbstractRxTask.java +++ b/java/org/apache/catalina/tribes/transport/AbstractRxTask.java @@ -18,8 +18,7 @@ package org.apache.catalina.tribes.transport; import org.apache.catalina.tribes.io.ListenCallback; -public abstract class AbstractRxTask implements Runnable -{ +public abstract class AbstractRxTask implements Runnable { public static final int OPTION_DIRECT_BUFFER = ReceiverBase.OPTION_DIRECT_BUFFER; diff --git a/java/org/apache/catalina/tribes/transport/AbstractSender.java b/java/org/apache/catalina/tribes/transport/AbstractSender.java index ba17303123..f3b7a0a563 100644 --- a/java/org/apache/catalina/tribes/transport/AbstractSender.java +++ b/java/org/apache/catalina/tribes/transport/AbstractSender.java @@ -37,7 +37,7 @@ public abstract class AbstractSender implements DataSender { private Member destination; private InetAddress address; private int port; - private int maxRetryAttempts = 1;//1 resends + private int maxRetryAttempts = 1;// 1 resends private int attempt; private boolean tcpNoDelay = true; private boolean soKeepAlive = false; @@ -52,8 +52,9 @@ public abstract class AbstractSender implements DataSender { /** * transfers sender properties from one sender to another + * * @param from AbstractSender - * @param to AbstractSender + * @param to AbstractSender */ public static void transferProperties(AbstractSender from, AbstractSender to) { to.rxBufSize = from.rxBufSize; @@ -87,19 +88,19 @@ public abstract class AbstractSender implements DataSender { public boolean keepalive() { boolean disconnect = false; if (isUdpBased()) { - disconnect = true; //always disconnect UDP, TODO optimize the keepalive handling - } else if ( keepAliveCount >= 0 && requestCount>keepAliveCount ) { + disconnect = true; // always disconnect UDP, TODO optimize the keepalive handling + } else if (keepAliveCount >= 0 && requestCount > keepAliveCount) { disconnect = true; - } else if ( keepAliveTime >= 0 && (System.currentTimeMillis()-connectTime)>keepAliveTime ) { + } else if (keepAliveTime >= 0 && (System.currentTimeMillis() - connectTime) > keepAliveTime) { disconnect = true; } - if ( disconnect ) { + if (disconnect) { disconnect(); } return disconnect; } - protected void setConnected(boolean connected){ + protected void setConnected(boolean connected) { this.connected = connected; } diff --git a/java/org/apache/catalina/tribes/transport/Constants.java b/java/org/apache/catalina/tribes/transport/Constants.java index 07946d826e..46983a493d 100644 --- a/java/org/apache/catalina/tribes/transport/Constants.java +++ b/java/org/apache/catalina/tribes/transport/Constants.java @@ -19,8 +19,8 @@ package org.apache.catalina.tribes.transport; import org.apache.catalina.tribes.io.XByteBuffer; /** - * Manifest constants for the <code>org.apache.catalina.tribes.transport</code> - * package. + * Manifest constants for the <code>org.apache.catalina.tribes.transport</code> package. + * * @author Peter Rossbach */ public class Constants { @@ -33,8 +33,8 @@ public class Constants { /* * Do not change any of these values! */ - public static final byte[] ACK_DATA = new byte[] {6, 2, 3}; - public static final byte[] FAIL_ACK_DATA = new byte[] {11, 0, 5}; + public static final byte[] ACK_DATA = new byte[] { 6, 2, 3 }; + public static final byte[] FAIL_ACK_DATA = new byte[] { 11, 0, 5 }; public static final byte[] ACK_COMMAND = XByteBuffer.createDataPackage(ACK_DATA); public static final byte[] FAIL_ACK_COMMAND = XByteBuffer.createDataPackage(FAIL_ACK_DATA); diff --git a/java/org/apache/catalina/tribes/transport/DataSender.java b/java/org/apache/catalina/tribes/transport/DataSender.java index decea50e16..96092a7d6b 100644 --- a/java/org/apache/catalina/tribes/transport/DataSender.java +++ b/java/org/apache/catalina/tribes/transport/DataSender.java @@ -22,6 +22,7 @@ public interface DataSender { /** * Connect. + * * @throws IOException when an error occurs */ void connect() throws IOException; @@ -38,36 +39,42 @@ public interface DataSender { /** * Set the receive buffer size. + * * @param size the new size */ void setRxBufSize(int size); /** * Set the transmit buffer size. + * * @param size the new size */ void setTxBufSize(int size); /** * Keepalive. + * * @return {@code true} if kept alive */ boolean keepalive(); /** * Set the socket timeout. + * * @param timeout in ms */ void setTimeout(long timeout); /** * Set the amount of requests during which to keepalive. + * * @param maxRequests the amount of requests */ void setKeepAliveCount(int maxRequests); /** * Set the keepalive time. + * * @param keepAliveTimeInMs the time in ms */ void setKeepAliveTime(long keepAliveTimeInMs); diff --git a/java/org/apache/catalina/tribes/transport/MultiPointSender.java b/java/org/apache/catalina/tribes/transport/MultiPointSender.java index d6b3d4633c..f6bebaa746 100644 --- a/java/org/apache/catalina/tribes/transport/MultiPointSender.java +++ b/java/org/apache/catalina/tribes/transport/MultiPointSender.java @@ -15,6 +15,7 @@ * limitations under the License. */ package org.apache.catalina.tribes.transport; + import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.Member; @@ -23,32 +24,38 @@ public interface MultiPointSender extends DataSender { /** * Send the specified message. + * * @param destination the message destinations - * @param data the data to send + * @param data the data to send + * * @throws ChannelException if an error occurs */ void sendMessage(Member[] destination, ChannelMessage data) throws ChannelException; /** * Set the maximum retry attempts. + * * @param attempts the retry count */ void setMaxRetryAttempts(int attempts); /** * Configure the use of a direct buffer. + * * @param directBuf {@code true} to use a direct buffer */ void setDirectBuffer(boolean directBuf); /** * Send to the specified member. + * * @param member the member */ void add(Member member); /** * Stop sending to the specified member. + * * @param member the member */ void remove(Member member); diff --git a/java/org/apache/catalina/tribes/transport/PooledSender.java b/java/org/apache/catalina/tribes/transport/PooledSender.java index 457230c685..b0af0fa4c9 100644 --- a/java/org/apache/catalina/tribes/transport/PooledSender.java +++ b/java/org/apache/catalina/tribes/transport/PooledSender.java @@ -28,14 +28,14 @@ import org.apache.juli.logging.LogFactory; public abstract class PooledSender extends AbstractSender implements MultiPointSender { private static final Log log = LogFactory.getLog(PooledSender.class); - protected static final StringManager sm = - StringManager.getManager(Constants.Package); + protected static final StringManager sm = StringManager.getManager(Constants.Package); private final SenderQueue queue; private int poolSize = 25; private long maxWait = 3000; + public PooledSender() { - queue = new SenderQueue(this,poolSize); + queue = new SenderQueue(this, poolSize); } public abstract DataSender getNewDataSender(); @@ -51,7 +51,7 @@ public abstract class PooledSender extends AbstractSender implements MultiPointS @Override public synchronized void connect() throws IOException { - //do nothing, happens in the socket sender itself + // do nothing, happens in the socket sender itself queue.open(); setConnected(true); } @@ -91,8 +91,8 @@ public abstract class PooledSender extends AbstractSender implements MultiPointS @Override public boolean keepalive() { - //do nothing, the pool checks on every return - return (queue==null)?false:queue.checkIdleKeepAlive(); + // do nothing, the pool checks on every return + return (queue == null) ? false : queue.checkIdleKeepAlive(); } @Override @@ -102,12 +102,12 @@ public abstract class PooledSender extends AbstractSender implements MultiPointS @Override public void remove(Member member) { - //no op for now, should not cancel out any keys - //can create serious sync issues - //all TCP connections are cleared out through keepalive - //and if remote node disappears + // no op for now, should not cancel out any keys + // can create serious sync issues + // all TCP connections are cleared out through keepalive + // and if remote node disappears } - // ----------------------------------------------------- Inner Class + // ----------------------------------------------------- Inner Class private static class SenderQueue { private int limit = 25; @@ -133,6 +133,7 @@ public abstract class PooledSender extends AbstractSender implements MultiPointS public int getLimit() { return limit; } + /** * @param limit The limit to set. */ @@ -159,7 +160,7 @@ public abstract class PooledSender extends AbstractSender implements MultiPointS public synchronized DataSender getSender(long timeout) { long start = System.currentTimeMillis(); - while ( true ) { + while (true) { if (!isOpen) { throw new IllegalStateException(sm.getString("pooledSender.closed.queue")); } @@ -172,35 +173,35 @@ public abstract class PooledSender extends AbstractSender implements MultiPointS if (sender != null) { inuse.add(sender); return sender; - }//end if + } // end if long delta = System.currentTimeMillis() - start; - if ( delta > timeout && timeout>0) { + if (delta > timeout && timeout > 0) { return null; } else { try { - wait(Math.max(timeout - delta,1)); - }catch (InterruptedException x){} - }//end if + wait(Math.max(timeout - delta, 1)); + } catch (InterruptedException x) { + } + } // end if } } public synchronized void returnSender(DataSender sender) { - if ( !isOpen) { + if (!isOpen) { sender.disconnect(); return; } - //to do + // to do inuse.remove(sender); - //just in case the limit has changed - if ( notinuse.size() < this.getLimit() ) { + // just in case the limit has changed + if (notinuse.size() < this.getLimit()) { notinuse.add(sender); } else { try { sender.disconnect(); } catch (Exception e) { if (log.isDebugEnabled()) { - log.debug(sm.getString( - "PooledSender.senderDisconnectFail"), e); + log.debug(sm.getString("PooledSender.senderDisconnectFail"), e); } } } @@ -214,11 +215,11 @@ public abstract class PooledSender extends AbstractSender implements MultiPointS for (Object value : unused) { DataSender sender = (DataSender) value; sender.disconnect(); - }//for + } // for for (Object o : used) { DataSender sender = (DataSender) o; sender.disconnect(); - }//for + } // for notinuse.clear(); inuse.clear(); notifyAll(); diff --git a/java/org/apache/catalina/tribes/transport/ReceiverBase.java b/java/org/apache/catalina/tribes/transport/ReceiverBase.java index 98385a750b..385dd28390 100644 --- a/java/org/apache/catalina/tribes/transport/ReceiverBase.java +++ b/java/org/apache/catalina/tribes/transport/ReceiverBase.java @@ -53,7 +53,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R private MessageListener listener; private String host = "auto"; private InetAddress bind; - private int port = 4000; + private int port = 4000; private int udpPort = -1; private int securePort = -1; private int rxBufSize = Constants.DEFAULT_CLUSTER_MSG_BUFFER_SIZE; @@ -65,7 +65,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R private RxTaskPool pool; private boolean direct = true; private long tcpSelectorTimeout = 5000; - //how many times to search for an available socket + // how many times to search for an available socket private int autoBind = 100; private int maxThreads = 15; private int minThreads = 6; @@ -78,7 +78,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R private boolean soLingerOn = true; private int soLingerTime = 3; private int soTrafficClass = 0x04 | 0x08 | 0x010; - private int timeout = 3000; //3 seconds + private int timeout = 3000; // 3 seconds private boolean useBufferPool = true; private boolean daemon = true; private long maxIdleTime = 60000; @@ -96,8 +96,9 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R @Override public void start() throws IOException { - if ( executor == null ) { - //executor = new ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>()); + if (executor == null) { + // executor = new ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new + // LinkedBlockingQueue<Runnable>()); String channelName = ""; if (channel.getName() != null) { channelName = "[" + channel.getName() + "]"; @@ -114,9 +115,8 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R @Override public void stop() { - if ( executor != null ) - { - executor.shutdownNow();//ignore left overs + if (executor != null) { + executor.shutdownNow();// ignore left overs } executor = null; if (oname != null) { @@ -181,14 +181,14 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R } /** - * Attempts to bind using the provided port and if that fails attempts to - * bind to each of the ports from portstart to (portstart + retries -1) - * until either there are no more ports or the bind is successful. The - * address to bind to is obtained via a call to {link {@link #getBind()}. - * @param socket The socket to bind - * @param portstart Starting port for bind attempts - * @param retries Number of times to attempt to bind (port incremented - * between attempts) + * Attempts to bind using the provided port and if that fails attempts to bind to each of the ports from portstart + * to (portstart + retries -1) until either there are no more ports or the bind is successful. The address to bind + * to is obtained via a call to {link {@link #getBind()}. + * + * @param socket The socket to bind + * @param portstart Starting port for bind attempts + * @param retries Number of times to attempt to bind (port incremented between attempts) + * * @throws IOException Socket bind error */ protected void bind(ServerSocket socket, int portstart, int retries) throws IOException { @@ -202,9 +202,9 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R setPort(port); log.info(sm.getString("receiverBase.socket.bind", addr)); retries = 0; - } catch ( IOException x) { + } catch (IOException x) { retries--; - if ( retries <= 0 ) { + if (retries <= 0) { log.info(sm.getString("receiverBase.unable.bind", addr)); throw x; } @@ -216,25 +216,27 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R /** * Same as bind() except it does it for the UDP port + * * @param socket The socket to bind * @param portstart Starting port for bind attempts - * @param retries Number of times to attempt to bind (port incremented - * between attempts) + * @param retries Number of times to attempt to bind (port incremented between attempts) + * * @return int The retry count + * * @throws IOException Socket bind error */ protected int bindUdp(DatagramSocket socket, int portstart, int retries) throws IOException { InetSocketAddress addr = null; - while ( retries > 0 ) { + while (retries > 0) { try { addr = new InetSocketAddress(getBind(), portstart); socket.bind(addr); setUdpPort(portstart); log.info(sm.getString("receiverBase.udp.bind", addr)); return 0; - }catch ( IOException x) { + } catch (IOException x) { retries--; - if ( retries <= 0 ) { + if (retries <= 0) { log.info(sm.getString("receiverBase.unable.bind.udp", addr)); throw x; } @@ -244,7 +246,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R } catch (InterruptedException ti) { Thread.currentThread().interrupt(); } - retries = bindUdp(socket,portstart,retries); + retries = bindUdp(socket, portstart, retries); } } return retries; @@ -253,8 +255,8 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R @Override public void messageDataReceived(ChannelMessage data) { - if ( this.listener != null ) { - if ( listener.accept(data) ) { + if (this.listener != null) { + if (listener.accept(data)) { listener.messageReceived(data); } } @@ -262,7 +264,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R public int getWorkerThreadOptions() { int options = 0; - if ( getDirect() ) { + if (getDirect()) { options = options | OPTION_DIRECT_BUFFER; } return options; @@ -281,7 +283,6 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R } - public void setDirect(boolean direct) { this.direct = direct; } @@ -395,6 +396,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R public void setAddress(String host) { this.host = host; } + public void setHost(String host) { setAddress(host); } @@ -413,7 +415,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R public void setAutoBind(int autoBind) { this.autoBind = autoBind; - if ( this.autoBind <= 0 ) { + if (this.autoBind <= 0) { this.autoBind = 1; } } @@ -481,7 +483,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R @Override public void heartbeat() { - //empty operation + // empty operation } @Override @@ -522,6 +524,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R // ---------------------------------------------- stats of the thread pool /** * Return the current number of threads that are managed by the pool. + * * @return the current number of threads that are managed by the pool */ public int getPoolSize() { @@ -534,6 +537,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R /** * Return the current number of threads that are in use. + * * @return the current number of threads that are in use */ public int getActiveCount() { @@ -546,6 +550,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R /** * Return the total number of tasks that have ever been scheduled for execution by the pool. + * * @return the total number of tasks that have ever been scheduled for execution by the pool */ public long getTaskCount() { @@ -558,6 +563,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R /** * Return the total number of tasks that have completed execution by the pool. + * * @return the total number of tasks that have completed execution by the pool */ public long getCompletedTaskCount() { diff --git a/java/org/apache/catalina/tribes/transport/ReplicationTransmitter.java b/java/org/apache/catalina/tribes/transport/ReplicationTransmitter.java index 78074eda61..f4fb162533 100644 --- a/java/org/apache/catalina/tribes/transport/ReplicationTransmitter.java +++ b/java/org/apache/catalina/tribes/transport/ReplicationTransmitter.java @@ -29,9 +29,7 @@ import org.apache.catalina.tribes.jmx.JmxRegistry; import org.apache.catalina.tribes.transport.nio.PooledParallelSender; /** - * Transmit message to other cluster members - * Actual senders are created based on the replicationMode - * type + * Transmit message to other cluster members Actual senders are created based on the replicationMode type */ public class ReplicationTransmitter implements ChannelSender { @@ -60,7 +58,7 @@ public class ReplicationTransmitter implements ChannelSender { @Override public void sendMessage(ChannelMessage message, Member[] destination) throws ChannelException { MultiPointSender sender = getTransport(); - sender.sendMessage(destination,message); + sender.sendMessage(destination, message); } @@ -101,14 +99,13 @@ public class ReplicationTransmitter implements ChannelSender { */ @Override public void heartbeat() { - if (getTransport()!=null) { + if (getTransport() != null) { getTransport().keepalive(); } } /** - * add new cluster member and create sender ( s. replicationMode) transfer - * current properties to sender + * add new cluster member and create sender ( s. replicationMode) transfer current properties to sender * * @see org.apache.catalina.tribes.ChannelSender#add(org.apache.catalina.tribes.Member) */ diff --git a/java/org/apache/catalina/tribes/transport/RxTaskPool.java b/java/org/apache/catalina/tribes/transport/RxTaskPool.java index d17dde5506..4d36367476 100644 --- a/java/org/apache/catalina/tribes/transport/RxTaskPool.java +++ b/java/org/apache/catalina/tribes/transport/RxTaskPool.java @@ -15,13 +15,13 @@ * limitations under the License. */ package org.apache.catalina.tribes.transport; + import java.util.ArrayList; import java.util.Iterator; import java.util.List; /** - * A very simple thread pool class. The pool size is set at - * construction time and remains fixed. Threads are cycled + * A very simple thread pool class. The pool size is set at construction time and remains fixed. Threads are cycled * through a FIFO idle queue. */ public class RxTaskPool { @@ -38,7 +38,7 @@ public class RxTaskPool { private final TaskCreator creator; - public RxTaskPool (int maxTasks, int minTasks, TaskCreator creator) throws Exception { + public RxTaskPool(int maxTasks, int minTasks, TaskCreator creator) throws Exception { // fill up the pool with worker threads this.maxTasks = maxTasks; this.minTasks = minTasks; @@ -48,30 +48,30 @@ public class RxTaskPool { protected void configureTask(AbstractRxTask task) { synchronized (task) { task.setTaskPool(this); -// task.setName(task.getClass().getName() + "[" + inc() + "]"); -// task.setDaemon(true); -// task.setPriority(Thread.MAX_PRIORITY); -// task.start(); + // task.setName(task.getClass().getName() + "[" + inc() + "]"); + // task.setDaemon(true); + // task.setPriority(Thread.MAX_PRIORITY); + // task.start(); } } /** - * Find an idle worker thread, if any. Could return null. + * Find an idle worker thread, if any. Could return null. + * * @return a worker */ - public AbstractRxTask getRxTask() - { + public AbstractRxTask getRxTask() { AbstractRxTask worker = null; synchronized (mutex) { - while ( worker == null && running ) { + while (worker == null && running) { if (idle.size() > 0) { try { worker = idle.remove(0); } catch (java.util.NoSuchElementException x) { - //this means that there are no available workers + // this means that there are no available workers worker = null; } - } else if ( used.size() < this.maxTasks && creator != null) { + } else if (used.size() < this.maxTasks && creator != null) { worker = creator.createRxTask(); configureTask(worker); } else { @@ -81,8 +81,8 @@ public class RxTaskPool { Thread.currentThread().interrupt(); } } - }//while - if ( worker != null ) { + } // while + if (worker != null) { used.add(worker); } } @@ -96,17 +96,17 @@ public class RxTaskPool { } /** - * Called by the worker thread to return itself to the - * idle pool. + * Called by the worker thread to return itself to the idle pool. + * * @param worker The worker */ - public void returnWorker (AbstractRxTask worker) { - if ( running ) { + public void returnWorker(AbstractRxTask worker) { + if (running) { synchronized (mutex) { used.remove(worker); - //if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker); - if ( idle.size() < maxTasks && !idle.contains(worker)) { - idle.add(worker); //let max be the upper limit + // if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker); + if (idle.size() < maxTasks && !idle.contains(worker)) { + idle.add(worker); // let max be the upper limit } else { worker.close(); } @@ -129,7 +129,7 @@ public class RxTaskPool { running = false; synchronized (mutex) { Iterator<AbstractRxTask> i = idle.iterator(); - while ( i.hasNext() ) { + while (i.hasNext()) { AbstractRxTask worker = i.next(); returnWorker(worker); i.remove(); @@ -149,7 +149,7 @@ public class RxTaskPool { return this.creator; } - public interface TaskCreator { + public interface TaskCreator { AbstractRxTask createRxTask(); } } diff --git a/java/org/apache/catalina/tribes/transport/SenderState.java b/java/org/apache/catalina/tribes/transport/SenderState.java index fedac53917..a333ae127d 100644 --- a/java/org/apache/catalina/tribes/transport/SenderState.java +++ b/java/org/apache/catalina/tribes/transport/SenderState.java @@ -27,7 +27,7 @@ public class SenderState { public static final int SUSPECT = 1; public static final int FAILING = 2; - protected static final ConcurrentMap<Member, SenderState> memberStates = new ConcurrentHashMap<>(); + protected static final ConcurrentMap<Member,SenderState> memberStates = new ConcurrentHashMap<>(); public static SenderState getSenderState(Member member) { return getSenderState(member, true); @@ -54,7 +54,7 @@ public class SenderState { private volatile int state = READY; - // ----------------------------------------------------- Constructor + // ----------------------------------------------------- Constructor private SenderState() { @@ -90,6 +90,6 @@ public class SenderState { } - // ----------------------------------------------------- Public Properties + // ----------------------------------------------------- Public Properties } diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java index 093851b2b2..4f3c1c1668 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java @@ -71,11 +71,11 @@ public class NioReceiver extends ReceiverBase implements Runnable, NioReceiverMB public void start() throws IOException { super.start(); try { - setPool(new RxTaskPool(getMaxThreads(),getMinThreads(),this)); + setPool(new RxTaskPool(getMaxThreads(), getMinThreads(), this)); } catch (Exception x) { log.fatal(sm.getString("nioReceiver.threadpool.fail"), x); - if ( x instanceof IOException ) { - throw (IOException)x; + if (x instanceof IOException) { + throw (IOException) x; } else { throw new IOException(x.getMessage()); } @@ -92,8 +92,8 @@ public class NioReceiver extends ReceiverBase implements Runnable, NioReceiverMB t.start(); } catch (Exception x) { log.fatal(sm.getString("nioReceiver.start.fail"), x); - if ( x instanceof IOException ) { - throw (IOException)x; + if (x instanceof IOException) { + throw (IOException) x; } else { throw new IOException(x.getMessage()); } @@ -102,7 +102,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, NioReceiverMB @Override public AbstractRxTask createRxTask() { - NioReplicationTask thread = new NioReplicationTask(this,this); + NioReplicationTask thread = new NioReplicationTask(this, this); thread.setUseBufferPool(this.getUseBufferPool()); thread.setRxBufSize(getRxBufSize()); thread.setOptions(getWorkerThreadOptions()); @@ -110,7 +110,6 @@ public class NioReceiver extends ReceiverBase implements Runnable, NioReceiverMB } - protected void bind() throws IOException { // allocate an unbound server socket channel serverChannel = ServerSocketChannel.open(); @@ -119,19 +118,19 @@ public class NioReceiver extends ReceiverBase implements Runnable, NioReceiverMB // create a new Selector for use below this.selector.set(Selector.open()); // set the port the server channel will listen to - //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort())); - bind(serverSocket,getPort(),getAutoBind()); + // serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort())); + bind(serverSocket, getPort(), getAutoBind()); // set non-blocking mode for the listening socket serverChannel.configureBlocking(false); // register the ServerSocketChannel with the Selector serverChannel.register(this.selector.get(), SelectionKey.OP_ACCEPT); - //set up the datagram channel - if (this.getUdpPort()>0) { + // set up the datagram channel + if (this.getUdpPort() > 0) { datagramChannel = DatagramChannel.open(); configureDatagraChannel(); - //bind to the address to avoid security checks - bindUdp(datagramChannel.socket(),getUdpPort(),getAutoBind()); + // bind to the address to avoid security checks + bindUdp(datagramChannel.socket(), getUdpPort(), getAutoBind()); } } @@ -162,7 +161,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, NioReceiverMB return; } Runnable r = null; - while ((r = events.pollFirst()) != null ) { + while ((r = events.pollFirst()) != null) { try { if (log.isTraceEnabled()) { log.trace("Processing event in selector:" + r); @@ -175,73 +174,84 @@ public class NioReceiver extends ReceiverBase implements Runnable, NioReceiverMB } public static void cancelledKey(SelectionKey key) { - ObjectReader reader = (ObjectReader)key.attachment(); - if ( reader != null ) { + ObjectReader reader = (ObjectReader) key.attachment(); + if (reader != null) { reader.setCancelled(true); reader.finish(); } key.cancel(); key.attach(null); if (key.channel() instanceof SocketChannel) { - try { ((SocketChannel)key.channel()).socket().close(); } catch (IOException e) { if (log.isDebugEnabled()) { - log.debug(sm.getString("nioReceiver.closeError"), e); - } } + try { + ((SocketChannel) key.channel()).socket().close(); + } catch (IOException e) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("nioReceiver.closeError"), e); + } + } } if (key.channel() instanceof DatagramChannel) { - try { ((DatagramChannel)key.channel()).socket().close(); } catch (Exception e) { if (log.isDebugEnabled()) { + try { + ((DatagramChannel) key.channel()).socket().close(); + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("nioReceiver.closeError"), e); + } + } + } + try { + key.channel().close(); + } catch (IOException e) { + if (log.isDebugEnabled()) { log.debug(sm.getString("nioReceiver.closeError"), e); - } } + } } - try { key.channel().close(); } catch (IOException e) { if (log.isDebugEnabled()) { - log.debug(sm.getString("nioReceiver.closeError"), e); - } } } + protected long lastCheck = System.currentTimeMillis(); + protected void socketTimeouts() { long now = System.currentTimeMillis(); - if ( (now-lastCheck) < getSelectorTimeout() ) { + if ((now - lastCheck) < getSelectorTimeout()) { return; } - //timeout + // timeout Selector tmpsel = this.selector.get(); - Set<SelectionKey> keys = (isListening()&&tmpsel!=null)?tmpsel.keys():null; - if ( keys == null ) { + Set<SelectionKey> keys = (isListening() && tmpsel != null) ? tmpsel.keys() : null; + if (keys == null) { return; } for (SelectionKey key : keys) { try { -// if (key.interestOps() == SelectionKey.OP_READ) { -// //only timeout sockets that we are waiting for a read from -// ObjectReader ka = (ObjectReader) key.attachment(); -// long delta = now - ka.getLastAccess(); -// if (delta > (long) getTimeout()) { -// cancelledKey(key); -// } -// } -// else - if ( key.interestOps() == 0 ) { - //check for keys that didn't make it in. + // if (key.interestOps() == SelectionKey.OP_READ) { + // //only timeout sockets that we are waiting for a read from + // ObjectReader ka = (ObjectReader) key.attachment(); + // long delta = now - ka.getLastAccess(); + // if (delta > (long) getTimeout()) { + // cancelledKey(key); + // } + // } + // else + if (key.interestOps() == 0) { + // check for keys that didn't make it in. ObjectReader ka = (ObjectReader) key.attachment(); - if ( ka != null ) { + if (ka != null) { long delta = now - ka.getLastAccess(); if (delta > getTimeout() && (!ka.isAccessed())) { if (log.isWarnEnabled()) { - log.warn(sm.getString( - "nioReceiver.threadsExhausted", - Integer.valueOf(getTimeout()), - Boolean.valueOf(ka.isCancelled()), - key, + log.warn(sm.getString("nioReceiver.threadsExhausted", Integer.valueOf(getTimeout()), + Boolean.valueOf(ka.isCancelled()), key, new java.sql.Timestamp(ka.getLastAccess()))); } ka.setLastAccess(now); - //key.interestOps(SelectionKey.OP_READ); - }//end if + // key.interestOps(SelectionKey.OP_READ); + } // end if } else { cancelledKey(key); - }//end if - }//end if - }catch ( CancelledKeyException ckx ) { + } // end if + } // end if + } catch (CancelledKeyException ckx) { cancelledKey(key); } } @@ -250,8 +260,8 @@ public class NioReceiver extends ReceiverBase implements Runnable, NioReceiverMB /** - * Get data from channel and store in byte array - * send it to cluster + * Get data from channel and store in byte array send it to cluster + * * @throws IOException IO error */ protected void listen() throws Exception { @@ -265,9 +275,9 @@ public class NioReceiver extends ReceiverBase implements Runnable, NioReceiverMB // Avoid NPEs if selector is set to null on stop. Selector selector = this.selector.get(); - if (selector!=null && datagramChannel!=null) { - ObjectReader oreader = new ObjectReader(MAX_UDP_SIZE); //max size for a datagram packet - registerChannel(selector,datagramChannel,SelectionKey.OP_READ,oreader); + if (selector != null && datagramChannel != null) { + ObjectReader oreader = new ObjectReader(MAX_UDP_SIZE); // max size for a datagram packet + registerChannel(selector, datagramChannel, SelectionKey.OP_READ, oreader); } while (doListen() && selector != null) { @@ -278,22 +288,22 @@ public class NioReceiver extends ReceiverBase implements Runnable, NioReceiverMB socketTimeouts(); int n = selector.select(getSelectorTimeout()); if (n == 0) { - //there is a good chance that we got here - //because the TcpReplicationThread called - //selector wakeup(). - //if that happens, we must ensure that that - //thread has enough time to call interestOps -// synchronized (interestOpsMutex) { - //if we got the lock, means there are no - //keys trying to register for the - //interestOps method -// } + // there is a good chance that we got here + // because the TcpReplicationThread called + // selector wakeup(). + // if that happens, we must ensure that that + // thread has enough time to call interestOps + // synchronized (interestOpsMutex) { + // if we got the lock, means there are no + // keys trying to register for the + // interestOps method + // } continue; // nothing to do } // get an iterator over the set of selected keys Iterator<SelectionKey> it = selector.selectedKeys().iterator(); // look at each key in the selected set - while (it!=null && it.hasNext()) { + while (it != null && it.hasNext()) { SelectionKey key = it.next(); // Is a new connection coming in? if (key.isAcceptable()) { @@ -305,13 +315,10 @@ public class NioReceiver extends ReceiverBase implements Runnable, NioReceiverMB channel.socket().setKeepAlive(getSoKeepAlive()); channel.socket().setOOBInline(getOoBInline()); channel.socket().setReuseAddress(getSoReuseAddress()); - channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime()); + channel.socket().setSoLinger(getSoLingerOn(), getSoLingerTime()); channel.socket().setSoTimeout(getTimeout()); Object attach = new ObjectReader(channel); - registerChannel(selector, - channel, - SelectionKey.OP_READ, - attach); + registerChannel(selector, channel, SelectionKey.OP_READ, attach); } // is there data to read on this channel? if (key.isReadable()) { @@ -334,21 +341,20 @@ public class NioReceiver extends ReceiverBase implements Runnable, NioReceiverMB } serverChannel.close(); - if (datagramChannel!=null) { + if (datagramChannel != null) { try { datagramChannel.close(); - }catch (Exception iox) { + } catch (Exception iox) { if (log.isDebugEnabled()) { log.debug(sm.getString("nioReceiver.closeError"), iox); } } - datagramChannel=null; + datagramChannel = null; } closeSelector(); } - /** * Close Selector. * @@ -365,7 +371,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, NioReceiverMB int count = 0; while (running && count < 50) { Thread.sleep(100); - count ++; + count++; } if (running) { log.warn(sm.getString("nioReceiver.stop.threadRunning")); @@ -391,16 +397,16 @@ public class NioReceiver extends ReceiverBase implements Runnable, NioReceiverMB key.attach(null); key.cancel(); } - } catch (IOException ignore){ + } catch (IOException ignore) { if (log.isWarnEnabled()) { log.warn(sm.getString("nioReceiver.cleanup.fail"), ignore); } - } catch (ClosedSelectorException ignore){ + } catch (ClosedSelectorException ignore) { // Ignore } try { selector.selectNow(); - } catch (Throwable t){ + } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // Ignore everything else } @@ -410,20 +416,18 @@ public class NioReceiver extends ReceiverBase implements Runnable, NioReceiverMB // ---------------------------------------------------------- /** - * Register the given channel with the given selector for - * the given operations of interest + * Register the given channel with the given selector for the given operations of interest + * * @param selector The selector to use - * @param channel The channel - * @param ops The operations to register - * @param attach Attachment object + * @param channel The channel + * @param ops The operations to register + * @param attach Attachment object + * * @throws Exception IO error with channel */ - protected void registerChannel(Selector selector, - SelectableChannel channel, - int ops, - Object attach) throws Exception { - if (channel == null) - { + protected void registerChannel(Selector selector, SelectableChannel channel, int ops, Object attach) + throws Exception { + if (channel == null) { return; // could happen } // set the new channel non-blocking @@ -451,11 +455,11 @@ public class NioReceiver extends ReceiverBase implements Runnable, NioReceiverMB /** * Sample data handler method for a channel with data ready to read. - * @param key A SelectionKey object associated with a channel - * determined by the selector to be ready for reading. If the - * channel returns an EOF condition, it is closed here, which - * automatically invalidates the associated key. The selector - * will then de-register the channel on the next select call. + * + * @param key A SelectionKey object associated with a channel determined by the selector to be ready for reading. If + * the channel returns an EOF condition, it is closed here, which automatically invalidates the + * associated key. The selector will then de-register the channel on the next select call. + * * @throws Exception IO error with channel */ protected void readDataFromSocket(SelectionKey key) throws Exception { @@ -470,7 +474,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, NioReceiverMB } } else { // invoking this wakes up the worker thread then returns - //add task to thread pool + // add task to thread pool task.serviceChannel(key); getExecutor().execute(task); } diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java b/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java index f28648c3b9..f4c91e1abf 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java @@ -15,6 +15,7 @@ * limitations under the License. */ package org.apache.catalina.tribes.transport.nio; + import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -41,14 +42,11 @@ import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; /** - * A worker thread class which can drain channels and echo-back the input. Each - * instance is constructed with a reference to the owning thread pool object. - * When started, the thread loops forever waiting to be awakened to service the - * channel associated with a SelectionKey object. The worker is tasked by - * calling its serviceChannel() method with a SelectionKey object. The - * serviceChannel() method stores the key reference in the thread object then - * calls notify() to wake it up. When the channel has been drained, the worker - * thread returns itself to its parent pool. + * A worker thread class which can drain channels and echo-back the input. Each instance is constructed with a reference + * to the owning thread pool object. When started, the thread loops forever waiting to be awakened to service the + * channel associated with a SelectionKey object. The worker is tasked by calling its serviceChannel() method with a + * SelectionKey object. The serviceChannel() method stores the key reference in the thread object then calls notify() to + * wake it up. When the channel has been drained, the worker thread returns itself to its parent pool. */ public class NioReplicationTask extends AbstractRxTask { @@ -60,7 +58,7 @@ public class NioReplicationTask extends AbstractRxTask { private int rxBufSize; private final NioReceiver receiver; - public NioReplicationTask (ListenCallback callback, NioReceiver receiver) { + public NioReplicationTask(ListenCallback callback, NioReceiver receiver) { super(callback); this.receiver = receiver; } @@ -68,12 +66,12 @@ public class NioReplicationTask extends AbstractRxTask { // loop forever waiting for work to do @Override public synchronized void run() { - if ( buffer == null ) { + if (buffer == null) { int size = getRxBufSize(); if (key.channel() instanceof DatagramChannel) { size = ChannelReceiver.MAX_UDP_SIZE; } - if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER) { + if ((getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER) { buffer = ByteBuffer.allocateDirect(size); } else { buffer = ByteBuffer.allocate(size); @@ -84,135 +82,129 @@ public class NioReplicationTask extends AbstractRxTask { if (key == null) { return; // just in case } - if ( log.isTraceEnabled() ) { - log.trace("Servicing key:"+key); + if (log.isTraceEnabled()) { + log.trace("Servicing key:" + key); } try { - ObjectReader reader = (ObjectReader)key.attachment(); - if ( reader == null ) { - if ( log.isTraceEnabled() ) { - log.trace("No object reader, cancelling:"+key); + ObjectReader reader = (ObjectReader) key.attachment(); + if (reader == null) { + if (log.isTraceEnabled()) { + log.trace("No object reader, cancelling:" + key); } cancelKey(key); } else { - if ( log.isTraceEnabled() ) { - log.trace("Draining channel:"+key); + if (log.isTraceEnabled()) { + log.trace("Draining channel:" + key); } drainChannel(key, reader); } } catch (Exception e) { - //this is common, since the sockets on the other - //end expire after a certain time. - if ( e instanceof CancelledKeyException ) { - //do nothing - } else if ( e instanceof IOException ) { - //don't spew out stack traces for IO exceptions unless debug is enabled. + // this is common, since the sockets on the other + // end expire after a certain time. + if (e instanceof CancelledKeyException) { + // do nothing + } else if (e instanceof IOException) { + // don't spew out stack traces for IO exceptions unless debug is enabled. if (log.isDebugEnabled()) { log.debug(sm.getString("nioReplicationTask.unable.drainChannel.ioe", e.getMessage()), e); } else { - log.warn (sm.getString("nioReplicationTask.unable.drainChannel.ioe", e.getMessage())); + log.warn(sm.getString("nioReplicationTask.unable.drainChannel.ioe", e.getMessage())); } - } else if ( log.isErrorEnabled() ) { - //this is a real error, log it. - log.error(sm.getString("nioReplicationTask.exception.drainChannel"),e); + } else if (log.isErrorEnabled()) { + // this is a real error, log it. + log.error(sm.getString("nioReplicationTask.exception.drainChannel"), e); } cancelKey(key); } key = null; // done, ready for more, return to pool - getTaskPool().returnWorker (this); + getTaskPool().returnWorker(this); } /** - * Called to initiate a unit of work by this worker thread - * on the provided SelectionKey object. This method is - * synchronized, as is the run() method, so only one key - * can be serviced at a given time. - * Before waking the worker thread, and before returning - * to the main selection loop, this key's interest set is - * updated to remove OP_READ. This will cause the selector - * to ignore read-readiness for this channel while the - * worker thread is servicing it. + * Called to initiate a unit of work by this worker thread on the provided SelectionKey object. This method is + * synchronized, as is the run() method, so only one key can be serviced at a given time. Before waking the worker + * thread, and before returning to the main selection loop, this key's interest set is updated to remove OP_READ. + * This will cause the selector to ignore read-readiness for this channel while the worker thread is servicing it. + * * @param key The key to process */ - public synchronized void serviceChannel (SelectionKey key) { - if ( log.isTraceEnabled() ) { - log.trace("About to service key:"+key); + public synchronized void serviceChannel(SelectionKey key) { + if (log.isTraceEnabled()) { + log.trace("About to service key:" + key); } - ObjectReader reader = (ObjectReader)key.attachment(); - if ( reader != null ) { + ObjectReader reader = (ObjectReader) key.attachment(); + if (reader != null) { reader.setLastAccess(System.currentTimeMillis()); } this.key = key; - key.interestOps (key.interestOps() & (~SelectionKey.OP_READ)); - key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE)); + key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); } /** - * The actual code which drains the channel associated with - * the given key. This method assumes the key has been - * modified prior to invocation to turn off selection - * interest in OP_READ. When this method completes it - * re-enables OP_READ and calls wakeup() on the selector - * so the selector will resume watching this channel. - * @param key The key to process + * The actual code which drains the channel associated with the given key. This method assumes the key has been + * modified prior to invocation to turn off selection interest in OP_READ. When this method completes it re-enables + * OP_READ and calls wakeup() on the selector so the selector will resume watching this channel. + * + * @param key The key to process * @param reader The reader + * * @throws Exception IO error */ - protected void drainChannel (final SelectionKey key, ObjectReader reader) throws Exception { + protected void drainChannel(final SelectionKey key, ObjectReader reader) throws Exception { reader.access(); ReadableByteChannel channel = (ReadableByteChannel) key.channel(); - int count=-1; + int count = -1; SocketAddress saddr = null; if (channel instanceof SocketChannel) { // loop while data available, channel is non-blocking - while ((count = channel.read (buffer)) > 0) { - buffer.flip(); // make buffer readable - if ( buffer.hasArray() ) { - reader.append(buffer.array(),0,count,false); + while ((count = channel.read(buffer)) > 0) { + buffer.flip(); // make buffer readable + if (buffer.hasArray()) { + reader.append(buffer.array(), 0, count, false); } else { - reader.append(buffer,count,false); + reader.append(buffer, count, false); } - buffer.clear(); // make buffer empty - //do we have at least one package? - if ( reader.hasPackage() ) { + buffer.clear(); // make buffer empty + // do we have at least one package? + if (reader.hasPackage()) { break; } } } else if (channel instanceof DatagramChannel) { - DatagramChannel dchannel = (DatagramChannel)channel; + DatagramChannel dchannel = (DatagramChannel) channel; saddr = dchannel.receive(buffer); - buffer.flip(); // make buffer readable - if ( buffer.hasArray() ) { - reader.append(buffer.array(),0,buffer.limit()-buffer.position(),false); + buffer.flip(); // make buffer readable + if (buffer.hasArray()) { + reader.append(buffer.array(), 0, buffer.limit() - buffer.position(), false); } else { - reader.append(buffer,buffer.limit()-buffer.position(),false); + reader.append(buffer, buffer.limit() - buffer.position(), false); } - buffer.clear(); // make buffer empty - //did we get a package - count = reader.hasPackage()?1:-1; + buffer.clear(); // make buffer empty + // did we get a package + count = reader.hasPackage() ? 1 : -1; } int pkgcnt = reader.count(); - if (count < 0 && pkgcnt == 0 ) { - //end of stream, and no more packages to process + if (count < 0 && pkgcnt == 0) { + // end of stream, and no more packages to process remoteEof(key); return; } - ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : reader.execute(); + ChannelMessage[] msgs = pkgcnt == 0 ? ChannelData.EMPTY_DATA_ARRAY : reader.execute(); - registerForRead(key,reader);//register to read new data, before we send it off to avoid dead locks + registerForRead(key, reader);// register to read new data, before we send it off to avoid dead locks for (ChannelMessage msg : msgs) { /* - * Use send ack here if you want to ack the request to the remote - * server before completing the request - * This is considered an asynchronous request + * Use send ack here if you want to ack the request to the remote server before completing the request This + * is considered an asynchronous request */ if (ChannelData.sendAckAsync(msg.getOptions())) { sendAck(key, (WritableByteChannel) channel, Constants.ACK_COMMAND, saddr); @@ -220,16 +212,16 @@ public class NioReplicationTask extends AbstractRxTask { try { if (Logs.MESSAGES.isTraceEnabled()) { try { - Logs.MESSAGES.trace("NioReplicationThread - Received msg:" + new UniqueId(msg.getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis())); + Logs.MESSAGES.trace("NioReplicationThread - Received msg:" + new UniqueId(msg.getUniqueId()) + + " at " + new java.sql.Timestamp(System.currentTimeMillis())); } catch (Throwable t) { } } - //process the message + // process the message getCallback().messageDataReceived(msg); /* - * Use send ack here if you want the request to complete on this - * server before sending the ack to the remote server - * This is considered a synchronized request + * Use send ack here if you want the request to complete on this server before sending the ack to the + * remote server This is considered a synchronized request */ if (ChannelData.sendAckSync(msg.getOptions())) { sendAck(key, (WritableByteChannel) channel, Constants.ACK_COMMAND, saddr); @@ -260,54 +252,54 @@ public class NioReplicationTask extends AbstractRxTask { private void remoteEof(SelectionKey key) { // close channel on EOF, invalidates the key - if ( log.isDebugEnabled() ) { + if (log.isDebugEnabled()) { log.debug(sm.getString("nioReplicationTask.disconnect")); } cancelKey(key); } protected void registerForRead(final SelectionKey key, ObjectReader reader) { - if ( log.isTraceEnabled() ) { - log.trace("Adding key for read event:"+key); + if (log.isTraceEnabled()) { + log.trace("Adding key for read event:" + key); } reader.finish(); - //register our OP_READ interest + // register our OP_READ interest Runnable r = () -> { try { if (key.isValid()) { // resume interest in OP_READ, OP_WRITE int resumeOps = key.interestOps() | SelectionKey.OP_READ; key.interestOps(resumeOps); - if ( log.isTraceEnabled() ) { - log.trace("Registering key for read:"+key); + if (log.isTraceEnabled()) { + log.trace("Registering key for read:" + key); } } - } catch (CancelledKeyException ckx ) { + } catch (CancelledKeyException ckx) { NioReceiver.cancelledKey(key); - if ( log.isTraceEnabled() ) { - log.trace("CKX Cancelling key:"+key); + if (log.isTraceEnabled()) { + log.trace("CKX Cancelling key:" + key); } } catch (Exception x) { - log.error(sm.getString("nioReplicationTask.error.register.key", key),x); + log.error(sm.getString("nioReplicationTask.error.register.key", key), x); } }; receiver.addEvent(r); } private void cancelKey(final SelectionKey key) { - if ( log.isTraceEnabled() ) { - log.trace("Adding key for cancel event:"+key); + if (log.isTraceEnabled()) { + log.trace("Adding key for cancel event:" + key); } - ObjectReader reader = (ObjectReader)key.attachment(); - if ( reader != null ) { + ObjectReader reader = (ObjectReader) key.attachment(); + if (reader != null) { reader.setCancelled(true); reader.finish(); } Runnable cx = () -> { - if ( log.isTraceEnabled() ) { - log.trace("Cancelling key:"+key); + if (log.isTraceEnabled()) { + log.trace("Cancelling key:" + key); } NioReceiver.cancelledKey(key); @@ -317,9 +309,10 @@ public class NioReplicationTask extends AbstractRxTask { /** - * Send a reply-acknowledgement (6,2,3), sends it doing a busy write, the ACK is so small - * that it should always go to the buffer. - * @param key The key to use + * Send a reply-acknowledgement (6,2,3), sends it doing a busy write, the ACK is so small that it should always go + * to the buffer. + * + * @param key The key to use * @param channel The channel * @param command The command to write * @param udpaddr Target address @@ -330,22 +323,21 @@ public class NioReplicationTask extends AbstractRxTask { ByteBuffer buf = ByteBuffer.wrap(command); int total = 0; if (channel instanceof DatagramChannel) { - DatagramChannel dchannel = (DatagramChannel)channel; - //were using a shared channel, document says its thread safe - //TODO check optimization, one channel per thread? - while ( total < command.length ) { + DatagramChannel dchannel = (DatagramChannel) channel; + // were using a shared channel, document says its thread safe + // TODO check optimization, one channel per thread? + while (total < command.length) { total += dchannel.send(buf, udpaddr); } } else { - while ( total < command.length ) { + while (total < command.length) { total += channel.write(buf); } } if (log.isTraceEnabled()) { log.trace("ACK sent to " + - ( (channel instanceof SocketChannel) ? - ((SocketChannel)channel).socket().getInetAddress() : - ((DatagramChannel)channel).socket().getInetAddress())); + ((channel instanceof SocketChannel) ? ((SocketChannel) channel).socket().getInetAddress() : + ((DatagramChannel) channel).socket().getInetAddress())); } } catch (IOException x) { log.warn(sm.getString("nioReplicationTask.unable.ack", x.getMessage())); diff --git a/java/org/apache/catalina/tribes/transport/nio/NioSender.java b/java/org/apache/catalina/tribes/transport/nio/NioSender.java index 0090c54035..8e7aedb79f 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioSender.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioSender.java @@ -33,15 +33,15 @@ import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; /** - * This class is NOT thread safe and should never be used with more than one thread at a time - * - * This is a state machine, handled by the process method - * States are: - * - NOT_CONNECTED -> connect() -> CONNECTED - * - CONNECTED -> setMessage() -> READY TO WRITE - * - READY_TO_WRITE -> write() -> READY TO WRITE | READY TO READ - * - READY_TO_READ -> read() -> READY_TO_READ | TRANSFER_COMPLETE - * - TRANSFER_COMPLETE -> CONNECTED + * This class is NOT thread safe and should never be used with more than one thread at a time This is a state machine, + * handled by the process method States are: + * <ul> + * <li>NOT_CONNECTED -> connect() -> CONNECTED</li> + * <li>CONNECTED -> setMessage() -> READY TO WRITE</li> + * <li>READY_TO_WRITE -> write() -> READY TO WRITE | READY TO READ</li> + * <li>READY_TO_READ -> read() -> READY_TO_READ | TRANSFER_COMPLETE</li> + * <li>TRANSFER_COMPLETE -> CONNECTED</li> + * </ul> */ public class NioSender extends AbstractSender { @@ -59,7 +59,7 @@ public class NioSender extends AbstractSender { protected ByteBuffer readbuf = null; protected ByteBuffer writebuf = null; protected volatile byte[] current = null; - protected final XByteBuffer ackbuf = new XByteBuffer(128,true); + protected final XByteBuffer ackbuf = new XByteBuffer(128, true); protected int remaining = 0; protected boolean complete; @@ -72,93 +72,96 @@ public class NioSender extends AbstractSender { /** * State machine to send data. - * @param key The key to use + * + * @param key The key to use * @param waitForAck Wait for an ack + * * @return <code>true</code> if the processing was successful + * * @throws IOException An IO error occurred */ public boolean process(SelectionKey key, boolean waitForAck) throws IOException { int ops = key.readyOps(); key.interestOps(key.interestOps() & ~ops); - //in case disconnect has been called + // in case disconnect has been called if ((!isConnected()) && (!connecting)) { throw new IOException(sm.getString("nioSender.sender.disconnected")); } - if ( !key.isValid() ) { + if (!key.isValid()) { throw new IOException(sm.getString("nioSender.key.inValid")); } - if ( key.isConnectable() ) { - if ( socketChannel.finishConnect() ) { + if (key.isConnectable()) { + if (socketChannel.finishConnect()) { completeConnect(); - if ( current != null ) { + if (current != null) { key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } return false; - } else { - //wait for the connection to finish + } else { + // wait for the connection to finish key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT); return false; - }//end if - } else if ( key.isWritable() ) { + } // end if + } else if (key.isWritable()) { boolean writecomplete = write(); - if ( writecomplete ) { - //we are completed, should we read an ack? - if ( waitForAck ) { - //register to read the ack + if (writecomplete) { + // we are completed, should we read an ack? + if (waitForAck) { + // register to read the ack key.interestOps(key.interestOps() | SelectionKey.OP_READ); } else { - //if not, we are ready, setMessage will reregister us for another write interest - //do a health check, we have no way of verify a disconnected - //socket since we don't register for OP_READ on waitForAck=false - read();//this causes overhead - setRequestCount(getRequestCount()+1); + // if not, we are ready, setMessage will reregister us for another write interest + // do a health check, we have no way of verify a disconnected + // socket since we don't register for OP_READ on waitForAck=false + read();// this causes overhead + setRequestCount(getRequestCount() + 1); return true; } } else { - //we are not complete, lets write some more - key.interestOps(key.interestOps()|SelectionKey.OP_WRITE); - }//end if - } else if ( key.isReadable() ) { + // we are not complete, lets write some more + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + } // end if + } else if (key.isReadable()) { boolean readcomplete = read(); - if ( readcomplete ) { - setRequestCount(getRequestCount()+1); + if (readcomplete) { + setRequestCount(getRequestCount() + 1); return true; } else { key.interestOps(key.interestOps() | SelectionKey.OP_READ); - }//end if + } // end if } else { - //unknown state, should never happen + // unknown state, should never happen log.warn(sm.getString("nioSender.unknown.state", Integer.toString(ops))); throw new IOException(sm.getString("nioSender.unknown.state", Integer.toString(ops))); - }//end if + } // end if return false; } private void configureSocket() throws IOException { - if (socketChannel!=null) { + if (socketChannel != null) { socketChannel.configureBlocking(false); socketChannel.socket().setSendBufferSize(getTxBufSize()); socketChannel.socket().setReceiveBufferSize(getRxBufSize()); - socketChannel.socket().setSoTimeout((int)getTimeout()); - socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerOn()?getSoLingerTime():0); + socketChannel.socket().setSoTimeout((int) getTimeout()); + socketChannel.socket().setSoLinger(getSoLingerOn(), getSoLingerOn() ? getSoLingerTime() : 0); socketChannel.socket().setTcpNoDelay(getTcpNoDelay()); socketChannel.socket().setKeepAlive(getSoKeepAlive()); socketChannel.socket().setReuseAddress(getSoReuseAddress()); socketChannel.socket().setOOBInline(getOoBInline()); - socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime()); + socketChannel.socket().setSoLinger(getSoLingerOn(), getSoLingerTime()); socketChannel.socket().setTrafficClass(getSoTrafficClass()); - } else if (dataChannel!=null) { + } else if (dataChannel != null) { dataChannel.configureBlocking(false); dataChannel.socket().setSendBufferSize(getUdpTxBufSize()); dataChannel.socket().setReceiveBufferSize(getUdpRxBufSize()); - dataChannel.socket().setSoTimeout((int)getTimeout()); + dataChannel.socket().setSoTimeout((int) getTimeout()); dataChannel.socket().setReuseAddress(getSoReuseAddress()); dataChannel.socket().setTrafficClass(getSoTrafficClass()); } } private void completeConnect() { - //we connected, register ourselves for writing + // we connected, register ourselves for writing setConnected(true); connecting = false; setRequestCount(0); @@ -166,27 +169,26 @@ public class NioSender extends AbstractSender { } - protected boolean read() throws IOException { - //if there is no message here, we are done - if ( current == null ) { + // if there is no message here, we are done + if (current == null) { return true; } - int read = isUdpBased()?dataChannel.read(readbuf) : socketChannel.read(readbuf); - //end of stream - if ( read == -1 ) { + int read = isUdpBased() ? dataChannel.read(readbuf) : socketChannel.read(readbuf); + // end of stream + if (read == -1) { throw new IOException(sm.getString("nioSender.unable.receive.ack")); - } else if ( read == 0 ) { + } else if (read == 0) { return false; } readbuf.flip(); - ackbuf.append(readbuf,read); + ackbuf.append(readbuf, read); readbuf.clear(); - if (ackbuf.doesPackageExist() ) { + if (ackbuf.doesPackageExist()) { byte[] ackcmd = ackbuf.extractDataPackage(true).getBytes(); - boolean ack = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.ACK_DATA); - boolean fack = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA); - if ( fack && getThrowOnFailedAck() ) { + boolean ack = Arrays.equals(ackcmd, org.apache.catalina.tribes.transport.Constants.ACK_DATA); + boolean fack = Arrays.equals(ackcmd, org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA); + if (fack && getThrowOnFailedAck()) { throw new RemoteProcessException(sm.getString("nioSender.receive.failedAck")); } return ack || fack; @@ -197,66 +199,66 @@ public class NioSender extends AbstractSender { protected boolean write() throws IOException { - if ( (!isConnected()) || (this.socketChannel==null && this.dataChannel==null)) { + if ((!isConnected()) || (this.socketChannel == null && this.dataChannel == null)) { throw new IOException(sm.getString("nioSender.not.connected")); } - if ( current != null ) { - if ( remaining > 0 ) { - //we have written everything, or we are starting a new package - //protect against buffer overwrite - int byteswritten = isUdpBased()?dataChannel.write(writebuf) : socketChannel.write(writebuf); + if (current != null) { + if (remaining > 0) { + // we have written everything, or we are starting a new package + // protect against buffer overwrite + int byteswritten = isUdpBased() ? dataChannel.write(writebuf) : socketChannel.write(writebuf); remaining -= byteswritten; - //if the entire message was written from the buffer - //reset the position counter - if ( remaining < 0 ) { + // if the entire message was written from the buffer + // reset the position counter + if (remaining < 0) { remaining = 0; } } - return (remaining==0); + return (remaining == 0); } - //no message to send, we can consider that complete + // no message to send, we can consider that complete return true; } @Override public synchronized void connect() throws IOException { - if ( connecting || isConnected()) { + if (connecting || isConnected()) { return; } connecting = true; - if ( isConnected() ) { + if (isConnected()) { throw new IOException(sm.getString("nioSender.already.connected")); } - if ( readbuf == null ) { + if (readbuf == null) { readbuf = getReadBuffer(); } else { readbuf.clear(); } - if ( writebuf == null ) { + if (writebuf == null) { writebuf = getWriteBuffer(); } else { writebuf.clear(); } if (isUdpBased()) { - InetSocketAddress daddr = new InetSocketAddress(getAddress(),getUdpPort()); - if ( dataChannel != null ) { + InetSocketAddress daddr = new InetSocketAddress(getAddress(), getUdpPort()); + if (dataChannel != null) { throw new IOException(sm.getString("nioSender.datagram.already.established")); } dataChannel = DatagramChannel.open(); configureSocket(); dataChannel.connect(daddr); completeConnect(); - dataChannel.register(getSelector(),SelectionKey.OP_WRITE, this); + dataChannel.register(getSelector(), SelectionKey.OP_WRITE, this); } else { - InetSocketAddress addr = new InetSocketAddress(getAddress(),getPort()); - if ( socketChannel != null ) { + InetSocketAddress addr = new InetSocketAddress(getAddress(), getPort()); + if (socketChannel != null) { throw new IOException(sm.getString("nioSender.socketChannel.already.established")); } socketChannel = SocketChannel.open(); configureSocket(); - if ( socketChannel.connect(addr) ) { + if (socketChannel.connect(addr)) { completeConnect(); socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this); } else { @@ -278,10 +280,10 @@ public class NioSender extends AbstractSender { } catch (Exception x) { // Ignore } - //error free close, all the way - //try {socket.shutdownOutput();}catch ( Exception x){} - //try {socket.shutdownInput();}catch ( Exception x){} - //try {socket.close();}catch ( Exception x){} + // error free close, all the way + // try {socket.shutdownOutput();}catch ( Exception x){} + // try {socket.shutdownInput();}catch ( Exception x){} + // try {socket.close();}catch ( Exception x){} try { socketChannel.close(); } catch (Exception x) { @@ -298,10 +300,10 @@ public class NioSender extends AbstractSender { } catch (Exception x) { // Ignore } - //error free close, all the way - //try {socket.shutdownOutput();}catch ( Exception x){} - //try {socket.shutdownInput();}catch ( Exception x){} - //try {socket.close();}catch ( Exception x){} + // error free close, all the way + // try {socket.shutdownOutput();}catch ( Exception x){} + // try {socket.shutdownInput();}catch ( Exception x){} + // try {socket.close();}catch ( Exception x){} try { dataChannel.close(); } catch (Exception x) { @@ -311,22 +313,22 @@ public class NioSender extends AbstractSender { dataChannel = null; } } - } catch ( Exception x ) { + } catch (Exception x) { log.error(sm.getString("nioSender.unable.disconnect", x.getMessage())); - if ( log.isDebugEnabled() ) { - log.debug(sm.getString("nioSender.unable.disconnect", x.getMessage()),x); + if (log.isDebugEnabled()) { + log.debug(sm.getString("nioSender.unable.disconnect", x.getMessage()), x); } } } public void reset() { - if ( isConnected() && readbuf == null) { + if (isConnected() && readbuf == null) { readbuf = getReadBuffer(); } - if ( readbuf != null ) { + if (readbuf != null) { readbuf.clear(); } - if ( writebuf != null ) { + if (writebuf != null) { writebuf.clear(); } current = null; @@ -346,20 +348,21 @@ public class NioSender extends AbstractSender { } private ByteBuffer getBuffer(int size) { - return getDirectBuffer()?ByteBuffer.allocateDirect(size):ByteBuffer.allocate(size); + return getDirectBuffer() ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size); } /** * Send message. * * @param data ChannelMessage + * * @throws IOException if an error occurs */ public void setMessage(byte[] data) throws IOException { - setMessage(data,0,data.length); + setMessage(data, 0, data.length); } - public void setMessage(byte[] data,int offset, int length) throws IOException { + public void setMessage(byte[] data, int offset, int length) throws IOException { if (data != null) { synchronized (this) { current = data; @@ -375,7 +378,7 @@ public class NioSender extends AbstractSender { } // TODO use ByteBuffer.wrap to avoid copying the data. - writebuf.put(data,offset,length); + writebuf.put(data, offset, length); writebuf.flip(); if (isConnected()) { if (isUdpBased()) { diff --git a/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java b/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java index f73347d5ff..2c0f7ca850 100644 --- a/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java +++ b/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java @@ -52,7 +52,7 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende private final InternalState state; - protected final long selectTimeout = 5000; //default 5 seconds, same as send timeout + protected final long selectTimeout = 5000; // default 5 seconds, same as send timeout public ParallelNioSender() throws IOException { state = new InternalState(Selector.open()); @@ -62,25 +62,23 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende @Override - public synchronized void sendMessage(Member[] destination, ChannelMessage msg) - throws ChannelException { + public synchronized void sendMessage(Member[] destination, ChannelMessage msg) throws ChannelException { long start = System.currentTimeMillis(); - this.setUdpBased((msg.getOptions()&Channel.SEND_OPTIONS_UDP) == Channel.SEND_OPTIONS_UDP); - byte[] data = XByteBuffer.createDataPackage((ChannelData)msg); + this.setUdpBased((msg.getOptions() & Channel.SEND_OPTIONS_UDP) == Channel.SEND_OPTIONS_UDP); + byte[] data = XByteBuffer.createDataPackage((ChannelData) msg); NioSender[] senders = setupForSend(destination); connect(senders); - setData(senders,data); + setData(senders, data); int remaining = senders.length; ChannelException cx = null; try { - //loop until complete, an error happens, or we timeout + // loop until complete, an error happens, or we timeout long delta = System.currentTimeMillis() - start; - boolean waitForAck = (Channel.SEND_OPTIONS_USE_ACK & - msg.getOptions()) == Channel.SEND_OPTIONS_USE_ACK; - while ( (remaining>0) && (delta<getTimeout()) ) { + boolean waitForAck = (Channel.SEND_OPTIONS_USE_ACK & msg.getOptions()) == Channel.SEND_OPTIONS_USE_ACK; + while ((remaining > 0) && (delta < getTimeout())) { try { - SendResult result = doLoop(selectTimeout, getMaxRetryAttempts(),waitForAck,msg); + SendResult result = doLoop(selectTimeout, getMaxRetryAttempts(), waitForAck, msg); remaining -= result.getCompleted(); if (result.getFailed() != null) { remaining -= result.getFailed().getFaultyMembers().length; @@ -90,13 +88,13 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende cx.addFaultyMember(result.getFailed().getFaultyMembers()); } } - } catch (Exception x ) { + } catch (Exception x) { if (log.isTraceEnabled()) { log.trace("Error sending message", x); } if (cx == null) { - if ( x instanceof ChannelException ) { - cx = (ChannelException)x; + if (x instanceof ChannelException) { + cx = (ChannelException) x; } else { cx = new ChannelException(sm.getString("parallelNioSender.send.failed"), x); } @@ -110,13 +108,13 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende } delta = System.currentTimeMillis() - start; } - if ( remaining > 0 ) { - //timeout has occurred - ChannelException cxtimeout = new ChannelException(sm.getString( - "parallelNioSender.operation.timedout", Long.toString(getTimeout()))); + if (remaining > 0) { + // timeout has occurred + ChannelException cxtimeout = new ChannelException( + sm.getString("parallelNioSender.operation.timedout", Long.toString(getTimeout()))); if (cx == null) { - cx = new ChannelException(sm.getString("parallelNioSender.operation.timedout", - Long.toString(getTimeout()))); + cx = new ChannelException( + sm.getString("parallelNioSender.operation.timedout", Long.toString(getTimeout()))); } for (NioSender sender : senders) { if (!sender.isComplete()) { @@ -124,18 +122,18 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende } } throw cx; - } else if ( cx != null ) { - //there was an error + } else if (cx != null) { + // there was an error throw cx; } - } catch (Exception x ) { + } catch (Exception x) { try { this.disconnect(); } catch (Exception e) { // Ignore } - if ( x instanceof ChannelException ) { - throw (ChannelException)x; + if (x instanceof ChannelException) { + throw (ChannelException) x; } else { throw new ChannelException(x); } @@ -165,44 +163,45 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende sk.interestOps(sk.interestOps() & ~readyOps); NioSender sender = (NioSender) sk.attachment(); try { - if (sender.process(sk,waitForAck)) { + if (sender.process(sk, waitForAck)) { sender.setComplete(true); result.complete(sender); - if ( Logs.MESSAGES.isTraceEnabled() ) { - Logs.MESSAGES.trace("ParallelNioSender - Sent msg:" + - new UniqueId(msg.getUniqueId()) + " at " + + if (Logs.MESSAGES.isTraceEnabled()) { + Logs.MESSAGES.trace("ParallelNioSender - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis()) + " to " + sender.getDestination().getName()); } SenderState.getSenderState(sender.getDestination()).setReady(); - }//end if + } // end if } catch (Exception x) { if (log.isTraceEnabled()) { - log.trace("Error while processing send to " + sender.getDestination().getName(), - x); + log.trace("Error while processing send to " + sender.getDestination().getName(), x); } SenderState state = SenderState.getSenderState(sender.getDestination()); - int attempt = sender.getAttempt()+1; - boolean retry = (attempt <= maxAttempts && maxAttempts>0); + int attempt = sender.getAttempt() + 1; + boolean retry = (attempt <= maxAttempts && maxAttempts > 0); synchronized (state) { - //sk.cancel(); + // sk.cancel(); if (state.isSuspect()) { state.setFailing(); } if (state.isReady()) { state.setSuspect(); - if ( retry ) { - log.warn(sm.getString("parallelNioSender.send.fail.retrying", sender.getDestination().getName())); + if (retry) { + log.warn(sm.getString("parallelNioSender.send.fail.retrying", + sender.getDestination().getName())); } else { log.warn(sm.getString("parallelNioSender.send.fail", sender.getDestination().getName()), x); } } } - if ( !isConnected() ) { - log.warn(sm.getString("parallelNioSender.sender.disconnected.notRetry", sender.getDestination().getName())); - ChannelException cx = new ChannelException(sm.getString("parallelNioSender.sender.disconnected.sendFailed"), x); - cx.addFaultyMember(sender.getDestination(),x); + if (!isConnected()) { + log.warn(sm.getString("parallelNioSender.sender.disconnected.notRetry", + sender.getDestination().getName())); + ChannelException cx = + new ChannelException(sm.getString("parallelNioSender.sender.disconnected.sendFailed"), x); + cx.addFaultyMember(sender.getDestination(), x); result.failed(cx); break; } @@ -214,17 +213,15 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende sender.connect(); sender.setAttempt(attempt); sender.setMessage(data); - } catch (Exception ignore){ + } catch (Exception ignore) { state.setFailing(); } } else { - ChannelException cx = new ChannelException( - sm.getString("parallelNioSender.sendFailed.attempt", - Integer.toString(sender.getAttempt()), - Integer.toString(maxAttempts)), x); - cx.addFaultyMember(sender.getDestination(),x); + ChannelException cx = new ChannelException(sm.getString("parallelNioSender.sendFailed.attempt", + Integer.toString(sender.getAttempt()), Integer.toString(maxAttempts)), x); + cx.addFaultyMember(sender.getDestination(), x); result.failed(cx); - }//end if + } // end if } } return result; @@ -233,15 +230,18 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende private static class SendResult { private List<NioSender> completeSenders = new ArrayList<>(); private ChannelException exception = null; + private void complete(NioSender sender) { if (!completeSenders.contains(sender)) { completeSenders.add(sender); } } + private int getCompleted() { return completeSenders.size(); } - private void failed(ChannelException cx){ + + private void failed(ChannelException cx) { if (exception == null) { exception = cx; } @@ -265,7 +265,7 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende x.addFaultyMember(sender.getDestination(), io); } } - if ( x != null ) { + if (x != null) { throw x; } } @@ -282,7 +282,7 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende x.addFaultyMember(sender.getDestination(), io); } } - if ( x != null ) { + if (x != null) { throw x; } } @@ -291,7 +291,7 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende private NioSender[] setupForSend(Member[] destination) throws ChannelException { ChannelException cx = null; NioSender[] result = new NioSender[destination.length]; - for ( int i=0; i<destination.length; i++ ) { + for (int i = 0; i < destination.length; i++) { NioSender sender = state.nioSenders.get(destination[i]); try { @@ -305,14 +305,14 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende sender.setSelector(state.selector); sender.setUdpBased(isUdpBased()); result[i] = sender; - }catch ( UnknownHostException x ) { + } catch (UnknownHostException x) { if (cx == null) { cx = new ChannelException(sm.getString("parallelNioSender.unable.setup.NioSender"), x); } cx.addFaultyMember(destination[i], x); } } - if ( cx != null ) { + if (cx != null) { throw cx; } else { return result; @@ -321,12 +321,12 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende @Override public void connect() { - //do nothing, we connect on demand + // do nothing, we connect on demand setConnected(true); } - private synchronized void close() throws ChannelException { + private synchronized void close() throws ChannelException { ChannelException x = null; Iterator<Map.Entry<Member,NioSender>> iter = state.nioSenders.entrySet().iterator(); while (iter.hasNext()) { @@ -353,9 +353,9 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende @Override public void remove(Member member) { - //disconnect senders + // disconnect senders NioSender sender = state.nioSenders.remove(member); - if ( sender != null ) { + if (sender != null) { sender.disconnect(); } } @@ -375,29 +375,32 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende public synchronized boolean keepalive() { boolean result = false; for (Iterator<Entry<Member,NioSender>> i = state.nioSenders.entrySet().iterator(); i.hasNext();) { - Map.Entry<Member, NioSender> entry = i.next(); + Map.Entry<Member,NioSender> entry = i.next(); NioSender sender = entry.getValue(); - if ( sender.keepalive() ) { - //nioSenders.remove(entry.getKey()); + if (sender.keepalive()) { + // nioSenders.remove(entry.getKey()); i.remove(); result = true; } else { try { sender.read(); - }catch ( IOException x ) { + } catch (IOException x) { sender.disconnect(); sender.reset(); - //nioSenders.remove(entry.getKey()); + // nioSenders.remove(entry.getKey()); i.remove(); result = true; - }catch ( Exception x ) { - log.warn(sm.getString("parallelNioSender.error.keepalive", sender),x); + } catch (Exception x) { + log.warn(sm.getString("parallelNioSender.error.keepalive", sender), x); } } } - //clean up any cancelled keys - if ( result ) { - try { state.selector.selectNow(); }catch (Exception e){/*Ignore*/} + // clean up any cancelled keys + if (result) { + try { + state.selector.selectNow(); + } catch (Exception e) { + /* Ignore */} } return result; } @@ -406,7 +409,7 @@ public class ParallelNioSender extends AbstractSender implements MultiPointSende private static class InternalState implements Runnable { private final Selector selector; - private final HashMap<Member, NioSender> nioSenders = new HashMap<>(); + private final HashMap<Member,NioSender> nioSenders = new HashMap<>(); private InternalState(Selector selector) { this.selector = selector; diff --git a/java/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java b/java/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java index d344022574..7ad0c8f173 100644 --- a/java/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java +++ b/java/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java @@ -33,13 +33,13 @@ public class PooledParallelSender extends PooledSender implements PooledParallel if (!isConnected()) { throw new ChannelException(sm.getString("pooledParallelSender.sender.disconnected")); } - ParallelNioSender sender = (ParallelNioSender)getSender(); + ParallelNioSender sender = (ParallelNioSender) getSender(); if (sender == null) { - ChannelException cx = new ChannelException(sm.getString( - "pooledParallelSender.unable.retrieveSender.timeout", - Long.toString(getMaxWait()))); + ChannelException cx = new ChannelException( + sm.getString("pooledParallelSender.unable.retrieveSender.timeout", Long.toString(getMaxWait()))); for (Member member : destination) { - cx.addFaultyMember(member, new NullPointerException(sm.getString("pooledParallelSender.unable.retrieveSender"))); + cx.addFaultyMember(member, + new NullPointerException(sm.getString("pooledParallelSender.unable.retrieveSender"))); } throw cx; } else { @@ -62,10 +62,10 @@ public class PooledParallelSender extends PooledSender implements PooledParallel public DataSender getNewDataSender() { try { ParallelNioSender sender = new ParallelNioSender(); - transferProperties(this,sender); + transferProperties(this, sender); return sender; - } catch ( IOException x ) { - throw new RuntimeException(sm.getString("pooledParallelSender.unable.open"),x); + } catch (IOException x) { + throw new RuntimeException(sm.getString("pooledParallelSender.unable.open"), x); } } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org