Author: kfujino
Date: Wed Oct 18 08:30:46 2017
New Revision: 1812474
URL: http://svn.apache.org/viewvc?rev=1812474&view=rev
Log:
-Ensure that the remaining Sender can send channel messages by avoiding
unintended ChannelException caused by comparing the number of failed members
and the number of remaining Senders.
-Ensure that remaining SelectionKeys that were not handled by throwing a
ChannelException during SelectionKey processing are handled.
Modified:
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml
Modified:
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
URL:
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java?rev=1812474&r1=1812473&r2=1812474&view=diff
==============================================================================
---
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
(original)
+++
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
Wed Oct 18 08:30:46 2017
@@ -21,8 +21,10 @@ import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -86,21 +88,26 @@ public class ParallelNioSender extends A
boolean waitForAck = (Channel.SEND_OPTIONS_USE_ACK &
msg.getOptions()) == Channel.SEND_OPTIONS_USE_ACK;
while ( (remaining>0) && (delta<getTimeout()) ) {
try {
- remaining -= doLoop(selectTimeout,
getMaxRetryAttempts(),waitForAck,msg);
+ SendResult result = doLoop(selectTimeout,
getMaxRetryAttempts(),waitForAck,msg);
+ remaining -= result.getCompleted();
+ if (result.getFailed() != null) {
+ remaining -=
result.getFailed().getFaultyMembers().length;
+ if (cx == null) cx = result.getFailed();
+ else
cx.addFaultyMember(result.getFailed().getFaultyMembers());
+ }
} catch (Exception x ) {
if (log.isTraceEnabled()) log.trace("Error sending
message", x);
- int faulty = (cx == null)?0:cx.getFaultyMembers().length;
- if ( cx == null ) {
+ if (cx == null) {
if ( x instanceof ChannelException ) cx =
(ChannelException)x;
else cx = new ChannelException("Parallel NIO send
failed.", x);
- } else {
- if (x instanceof ChannelException) cx.addFaultyMember(
( (ChannelException) x).getFaultyMembers());
}
- //count down the remaining on an error
- if (faulty<cx.getFaultyMembers().length) remaining -=
(cx.getFaultyMembers().length-faulty);
+ for (int i=0; i<senders.length; i++ ) {
+ if (!senders[i].isComplete()) {
+ cx.addFaultyMember(senders[i].getDestination(),x);
+ }
+ }
+ throw cx;
}
- //bail out if all remaining senders are failing
- if ( cx != null && cx.getFaultyMembers().length == remaining )
throw cx;
delta = System.currentTimeMillis() - start;
}
if ( remaining > 0 ) {
@@ -123,12 +130,17 @@ public class ParallelNioSender extends A
}
- private int doLoop(long selectTimeOut, int maxAttempts, boolean
waitForAck, ChannelMessage msg) throws IOException, ChannelException {
- int completed = 0;
- int selectedKeys = selector.select(selectTimeOut);
+ private SendResult doLoop(long selectTimeOut, int maxAttempts, boolean
waitForAck, ChannelMessage msg) throws ChannelException {
+ SendResult result = new SendResult();
+ int selectedKeys;
+ try {
+ selectedKeys = selector.select(selectTimeOut);
+ } catch (IOException ioe) {
+ throw new ChannelException("Parallel NIO send failed.", ioe);
+ }
if (selectedKeys == 0) {
- return 0;
+ return result;
}
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
@@ -140,8 +152,8 @@ public class ParallelNioSender extends A
NioSender sender = (NioSender) sk.attachment();
try {
if (sender.process(sk,waitForAck)) {
- completed++;
sender.setComplete(true);
+ result.complete(sender);
if ( Logs.MESSAGES.isTraceEnabled() ) {
Logs.MESSAGES.trace("ParallelNioSender - Sent msg:" +
new UniqueId(msg.getUniqueId()) + " at " +new
java.sql.Timestamp(System.currentTimeMillis())+ " to
"+sender.getDestination().getName());
}
@@ -170,28 +182,47 @@ public class ParallelNioSender extends A
log.warn("Not retrying send for:" +
sender.getDestination().getName() + "; Sender is disconnected.");
ChannelException cx = new ChannelException("Send failed,
and sender is disconnected. Not retrying.",x);
cx.addFaultyMember(sender.getDestination(),x);
- throw cx;
+ result.failed(cx);
+ break;
}
byte[] data = sender.getMessage();
- if ( retry ) {
+ if (retry) {
try {
sender.disconnect();
sender.connect();
sender.setAttempt(attempt);
sender.setMessage(data);
- }catch ( Exception ignore){
+ } catch ( Exception ignore){
state.setFailing();
}
} else {
ChannelException cx = new ChannelException("Send failed,
attempt:"+sender.getAttempt()+" max:"+maxAttempts,x);
cx.addFaultyMember(sender.getDestination(),x);
- throw cx;
+ result.failed(cx);
}//end if
}
}
- return completed;
+ return result;
+ }
+
+ private static class SendResult {
+ private List<NioSender> completeSenders = new ArrayList<NioSender>();
+ private ChannelException exception = null;
+ private void complete(NioSender sender) {
+ if (!completeSenders.contains(sender)) completeSenders.add(sender);
+ }
+ private int getCompleted() {
+ return completeSenders.size();
+ }
+ private void failed(ChannelException cx){
+ if (exception == null) exception = cx;
+ exception.addFaultyMember(cx.getFaultyMembers());
+ }
+ private ChannelException getFailed() {
+ return exception;
+ }
}
private void connect(NioSender[] senders) throws ChannelException {
Modified: tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml
URL:
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml?rev=1812474&r1=1812473&r2=1812474&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Wed Oct 18 08:30:46 2017
@@ -116,6 +116,16 @@
than the actual setting value of <code>maxRetryAttempts</code>.
(kfujino)
</fix>
+ <fix>
+ Ensure that the remaining Sender can send channel messages by avoiding
+ unintended <code>ChannelException</code> caused by comparing the number
+ of failed members and the number of remaining Senders. (kfujino)
+ </fix>
+ <fix>
+ Ensure that remaining SelectionKeys that were not handled by throwing a
+ <code>ChannelException</code> during SelectionKey processing are
+ handled. (kfujino)
+ </fix>
</changelog>
</subsection>
<subsection name="Other">
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]