Author: kfujino Date: Wed Oct 18 08:25:03 2017 New Revision: 1812471 URL: http://svn.apache.org/viewvc?rev=1812471&view=rev Log: Ensure that remaining SelectionKeys that were not handled by throwing a ChannelException during SelectionKey processing are handled.
Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java tomcat/trunk/webapps/docs/changelog.xml Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java?rev=1812471&r1=1812470&r2=1812471&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java Wed Oct 18 08:25:03 2017 @@ -20,8 +20,10 @@ import java.io.IOException; import java.net.UnknownHostException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -73,22 +75,25 @@ public class ParallelNioSender extends A msg.getOptions()) == Channel.SEND_OPTIONS_USE_ACK; while ( (remaining>0) && (delta<getTimeout()) ) { try { - remaining -= doLoop(selectTimeout, getMaxRetryAttempts(),waitForAck,msg); + SendResult result = doLoop(selectTimeout, getMaxRetryAttempts(),waitForAck,msg); + remaining -= result.getCompleted(); + if (result.getFailed() != null) { + remaining -= result.getFailed().getFaultyMembers().length; + if (cx == null) cx = result.getFailed(); + else cx.addFaultyMember(result.getFailed().getFaultyMembers()); + } } catch (Exception x ) { if (log.isTraceEnabled()) log.trace("Error sending message", x); - int faulty = (cx == null)?0:cx.getFaultyMembers().length; - if ( cx == null ) { + if (cx == null) { if ( x instanceof ChannelException ) cx = (ChannelException)x; else cx = new ChannelException(sm.getString("parallelNioSender.send.failed"), x); - } else { - if (x instanceof ChannelException) { - cx.addFaultyMember(((ChannelException) x).getFaultyMembers()); - } } - //count down the remaining on an error - if (faulty < cx.getFaultyMembers().length) { - remaining -= (cx.getFaultyMembers().length - faulty); + for (int i=0; i<senders.length; i++ ) { + if (!senders[i].isComplete()) { + cx.addFaultyMember(senders[i].getDestination(),x); + } } + throw cx; } delta = System.currentTimeMillis() - start; } @@ -118,13 +123,18 @@ public class ParallelNioSender extends A } - private int doLoop(long selectTimeOut, int maxAttempts, boolean waitForAck, ChannelMessage msg) - throws IOException, ChannelException { - int completed = 0; - int selectedKeys = selector.select(selectTimeOut); + private SendResult doLoop(long selectTimeOut, int maxAttempts, boolean waitForAck, ChannelMessage msg) + throws ChannelException { + SendResult result = new SendResult(); + int selectedKeys; + try { + selectedKeys = selector.select(selectTimeOut); + } catch (IOException ioe) { + throw new ChannelException(sm.getString("parallelNioSender.send.failed"), ioe); + } if (selectedKeys == 0) { - return 0; + return result; } Iterator<SelectionKey> it = selector.selectedKeys().iterator(); @@ -136,8 +146,8 @@ public class ParallelNioSender extends A NioSender sender = (NioSender) sk.attachment(); try { if (sender.process(sk,waitForAck)) { - completed++; sender.setComplete(true); + result.complete(sender); if ( Logs.MESSAGES.isTraceEnabled() ) { Logs.MESSAGES.trace("ParallelNioSender - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " + @@ -170,17 +180,18 @@ public class ParallelNioSender extends A log.warn(sm.getString("parallelNioSender.sender.disconnected.notRetry", sender.getDestination().getName())); ChannelException cx = new ChannelException(sm.getString("parallelNioSender.sender.disconnected.sendFailed"), x); cx.addFaultyMember(sender.getDestination(),x); - throw cx; + result.failed(cx); + break; } byte[] data = sender.getMessage(); - if ( retry ) { + if (retry) { try { sender.disconnect(); sender.connect(); sender.setAttempt(attempt); sender.setMessage(data); - }catch ( Exception ignore){ + } catch (Exception ignore){ state.setFailing(); } } else { @@ -189,12 +200,31 @@ public class ParallelNioSender extends A Integer.toString(sender.getAttempt()), Integer.toString(maxAttempts)), x); cx.addFaultyMember(sender.getDestination(),x); - throw cx; + result.failed(cx); }//end if } } - return completed; + return result; + + } + + private static class SendResult { + private List<NioSender> completeSenders = new ArrayList<>(); + private ChannelException exception = null; + private void complete(NioSender sender) { + if (!completeSenders.contains(sender)) completeSenders.add(sender); + } + private int getCompleted() { + return completeSenders.size(); + } + private void failed(ChannelException cx){ + if (exception == null) exception = cx; + exception.addFaultyMember(cx.getFaultyMembers()); + } + private ChannelException getFailed() { + return exception; + } } private void connect(NioSender[] senders) throws ChannelException { Modified: tomcat/trunk/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1812471&r1=1812470&r2=1812471&view=diff ============================================================================== --- tomcat/trunk/webapps/docs/changelog.xml (original) +++ tomcat/trunk/webapps/docs/changelog.xml Wed Oct 18 08:25:03 2017 @@ -118,6 +118,11 @@ unintended <code>ChannelException</code> caused by comparing the number of failed members and the number of remaining Senders. (kfujino) </fix> + <fix> + Ensure that remaining SelectionKeys that were not handled by throwing a + <code>ChannelException</code> during SelectionKey processing are + handled. (kfujino) + </fix> </changelog> </subsection> <subsection name="Other"> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org