Author: kfujino Date: Wed Oct 18 08:27:32 2017 New Revision: 1812472 URL: http://svn.apache.org/viewvc?rev=1812472&view=rev Log: -Ensure that the remaining Sender can send channel messages by avoiding unintended ChannelException caused by comparing the number of failed members and the number of remaining Senders. -Ensure that remaining SelectionKeys that were not handled by throwing a ChannelException during SelectionKey processing are handled.
Modified: tomcat/tc8.5.x/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java tomcat/tc8.5.x/trunk/webapps/docs/changelog.xml Modified: tomcat/tc8.5.x/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java URL: http://svn.apache.org/viewvc/tomcat/tc8.5.x/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java?rev=1812472&r1=1812471&r2=1812472&view=diff ============================================================================== --- tomcat/tc8.5.x/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java (original) +++ tomcat/tc8.5.x/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java Wed Oct 18 08:27:32 2017 @@ -20,8 +20,10 @@ import java.io.IOException; import java.net.UnknownHostException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -73,25 +75,26 @@ public class ParallelNioSender extends A msg.getOptions()) == Channel.SEND_OPTIONS_USE_ACK; while ( (remaining>0) && (delta<getTimeout()) ) { try { - remaining -= doLoop(selectTimeout, getMaxRetryAttempts(),waitForAck,msg); + SendResult result = doLoop(selectTimeout, getMaxRetryAttempts(),waitForAck,msg); + remaining -= result.getCompleted(); + if (result.getFailed() != null) { + remaining -= result.getFailed().getFaultyMembers().length; + if (cx == null) cx = result.getFailed(); + else cx.addFaultyMember(result.getFailed().getFaultyMembers()); + } } catch (Exception x ) { if (log.isTraceEnabled()) log.trace("Error sending message", x); - int faulty = (cx == null)?0:cx.getFaultyMembers().length; - if ( cx == null ) { + if (cx == null) { if ( x instanceof ChannelException ) cx = (ChannelException)x; else cx = new ChannelException(sm.getString("parallelNioSender.send.failed"), x); - } else { - if (x instanceof ChannelException) { - cx.addFaultyMember(((ChannelException) x).getFaultyMembers()); - } } - //count down the remaining on an error - if (faulty < cx.getFaultyMembers().length) { - remaining -= (cx.getFaultyMembers().length - faulty); + for (int i=0; i<senders.length; i++ ) { + if (!senders[i].isComplete()) { + cx.addFaultyMember(senders[i].getDestination(),x); + } } + throw cx; } - //bail out if all remaining senders are failing - if ( cx != null && cx.getFaultyMembers().length == remaining ) throw cx; delta = System.currentTimeMillis() - start; } if ( remaining > 0 ) { @@ -120,13 +123,17 @@ public class ParallelNioSender extends A } - private int doLoop(long selectTimeOut, int maxAttempts, boolean waitForAck, ChannelMessage msg) - throws IOException, ChannelException { - int completed = 0; - int selectedKeys = selector.select(selectTimeOut); - + private SendResult doLoop(long selectTimeOut, int maxAttempts, boolean waitForAck, ChannelMessage msg) + throws ChannelException { + SendResult result = new SendResult(); + int selectedKeys; + try { + selectedKeys = selector.select(selectTimeOut); + } catch (IOException ioe) { + throw new ChannelException(sm.getString("parallelNioSender.send.failed"), ioe); + } if (selectedKeys == 0) { - return 0; + return result; } Iterator<SelectionKey> it = selector.selectedKeys().iterator(); @@ -138,8 +145,8 @@ public class ParallelNioSender extends A NioSender sender = (NioSender) sk.attachment(); try { if (sender.process(sk,waitForAck)) { - completed++; sender.setComplete(true); + result.complete(sender); if ( Logs.MESSAGES.isTraceEnabled() ) { Logs.MESSAGES.trace("ParallelNioSender - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " + @@ -172,17 +179,18 @@ public class ParallelNioSender extends A log.warn(sm.getString("parallelNioSender.sender.disconnected.notRetry", sender.getDestination().getName())); ChannelException cx = new ChannelException(sm.getString("parallelNioSender.sender.disconnected.sendFailed"), x); cx.addFaultyMember(sender.getDestination(),x); - throw cx; + result.failed(cx); + break; } byte[] data = sender.getMessage(); - if ( retry ) { + if (retry) { try { sender.disconnect(); sender.connect(); sender.setAttempt(attempt); sender.setMessage(data); - }catch ( Exception ignore){ + } catch (Exception ignore){ state.setFailing(); } } else { @@ -191,12 +199,31 @@ public class ParallelNioSender extends A Integer.toString(sender.getAttempt()), Integer.toString(maxAttempts)), x); cx.addFaultyMember(sender.getDestination(),x); - throw cx; + result.failed(cx); }//end if } } - return completed; + return result; + + } + + private static class SendResult { + private List<NioSender> completeSenders = new ArrayList<>(); + private ChannelException exception = null; + private void complete(NioSender sender) { + if (!completeSenders.contains(sender)) completeSenders.add(sender); + } + private int getCompleted() { + return completeSenders.size(); + } + private void failed(ChannelException cx){ + if (exception == null) exception = cx; + exception.addFaultyMember(cx.getFaultyMembers()); + } + private ChannelException getFailed() { + return exception; + } } private void connect(NioSender[] senders) throws ChannelException { Modified: tomcat/tc8.5.x/trunk/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/tc8.5.x/trunk/webapps/docs/changelog.xml?rev=1812472&r1=1812471&r2=1812472&view=diff ============================================================================== --- tomcat/tc8.5.x/trunk/webapps/docs/changelog.xml (original) +++ tomcat/tc8.5.x/trunk/webapps/docs/changelog.xml Wed Oct 18 08:27:32 2017 @@ -114,6 +114,16 @@ than the actual setting value of <code>maxRetryAttempts</code>. (kfujino) </fix> + <fix> + Ensure that the remaining Sender can send channel messages by avoiding + unintended <code>ChannelException</code> caused by comparing the number + of failed members and the number of remaining Senders. (kfujino) + </fix> + <fix> + Ensure that remaining SelectionKeys that were not handled by throwing a + <code>ChannelException</code> during SelectionKey processing are + handled. (kfujino) + </fix> </changelog> </subsection> <subsection name="Other"> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org