Murtadha Hubail has submitted this change and it was merged. Change subject: [NO ISSUE][NET] Skip Channel Write on Connection Failure ......................................................................
[NO ISSUE][NET] Skip Channel Write on Connection Failure - user model changes: no - storage format changes: no - interface changes: no Details: - Do not attempt to adjust channel writability if the failure was due to connection failure since no more messages will be sent on that connection. This is done to prevent a possible deadlock between network IOThread that detected connection failure and another thread that might be accessing the channel. - Make sending error code conditions more explicit since we currently have a single error code that is sent. Change-Id: Ic25f05ac2c0d02699324f2d1b80c51f392654106 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2892 Sonar-Qube: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java 6 files changed, 16 insertions(+), 16 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; No violations found; Verified Michael Blow: Looks good to me, approved diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java index 43c1542..9ad870c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java @@ -46,10 +46,10 @@ } else { adjustChannelWritability(); } - } else if (ecode >= 0 && !ecodeSent) { + } else if (ecode.get() == REMOTE_ERROR_CODE && !ecodeSent) { writerState.getCommand().setChannelId(channelId); writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR); - writerState.getCommand().setData(ecode); + writerState.getCommand().setData(REMOTE_ERROR_CODE); writerState.reset(null, 0, null); ecodeSent = true; ccb.reportLocalEOS(); diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java index 5ce29c2..f32adcc 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java @@ -66,7 +66,7 @@ } } if (hasFailed()) { - if (errorCode == AbstractChannelWriteInterface.LOCAL_ERROR_CODE) { + if (errorCode == AbstractChannelWriteInterface.CONNECTION_LOST_ERROR_CODE) { throw HyracksDataException.create(ErrorCode.LOCAL_NETWORK_ERROR); } // Do not throw exception here to allow the root cause exception gets propagated to the master first. diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java index d8dc4b9..becbb00 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java @@ -158,7 +158,7 @@ // Note: if a remote failure overwrites the value of localFailure, then we rely on // the fact that the remote task will notify the cc of the failure. // Otherwise, the local task must fail - localFailure = errorCode == AbstractChannelWriteInterface.LOCAL_ERROR_CODE; + localFailure = errorCode == AbstractChannelWriteInterface.CONNECTION_LOST_ERROR_CODE; failSenders.set(senderIndex); eosSenders.set(senderIndex); notifyAll(); diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java index 7ed9bfa..0a28e93 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hyracks.api.comm.IBufferAcceptor; import org.apache.hyracks.api.comm.IChannelControlBlock; @@ -32,18 +33,18 @@ public abstract class AbstractChannelWriteInterface implements IChannelWriteInterface { public static final int NO_ERROR_CODE = 0; + public static final int CONNECTION_LOST_ERROR_CODE = -1; public static final int REMOTE_ERROR_CODE = 1; - public static final int LOCAL_ERROR_CODE = -1; private static final Logger LOGGER = LogManager.getLogger(); protected final IChannelControlBlock ccb; protected final Queue<ByteBuffer> wiFullQueue; + protected final AtomicInteger ecode = new AtomicInteger(NO_ERROR_CODE); protected boolean channelWritabilityState; protected final int channelId; protected IBufferAcceptor eba; protected int credits; protected boolean eos; protected boolean eosSent; - protected int ecode; protected boolean ecodeSent; protected ByteBuffer currentWriteBuffer; private final ICloseableBufferAcceptor fba; @@ -56,7 +57,6 @@ credits = 0; eos = false; eosSent = false; - ecode = -1; ecodeSent = false; } @@ -78,10 +78,7 @@ if (eos && !eosSent) { return true; } - if (ecode >= 0 && !ecodeSent) { - return true; - } - return false; + return ecode.get() == REMOTE_ERROR_CODE && !ecodeSent; } @Override @@ -138,7 +135,7 @@ return; } eos = true; - if (ecode != REMOTE_ERROR_CODE) { + if (ecode.get() != REMOTE_ERROR_CODE) { adjustChannelWritability(); } } @@ -146,8 +143,11 @@ @Override public void error(int ecode) { + AbstractChannelWriteInterface.this.ecode.set(ecode); + if (ecode == CONNECTION_LOST_ERROR_CODE) { + return; + } synchronized (ccb) { - AbstractChannelWriteInterface.this.ecode = ecode; adjustChannelWritability(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java index a546349..31a37ef 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java @@ -200,7 +200,7 @@ for (int i = 0; i < ccbArray.length; ++i) { ChannelControlBlock ccb = ccbArray[i]; if (ccb != null) { - ccb.reportRemoteError(AbstractChannelWriteInterface.LOCAL_ERROR_CODE); + ccb.reportRemoteError(AbstractChannelWriteInterface.CONNECTION_LOST_ERROR_CODE); markEOSAck(i); unmarkPendingCredits(i); } diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java index 17b70a8..628007d 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java @@ -50,10 +50,10 @@ } else { adjustChannelWritability(); } - } else if (ecode >= 0 && !ecodeSent) { + } else if (ecode.get() == REMOTE_ERROR_CODE && !ecodeSent) { writerState.getCommand().setChannelId(channelId); writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR); - writerState.getCommand().setData(ecode); + writerState.getCommand().setData(REMOTE_ERROR_CODE); writerState.reset(null, 0, null); ecodeSent = true; ccb.reportLocalEOS(); -- To view, visit https://asterix-gerrit.ics.uci.edu/2892 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ic25f05ac2c0d02699324f2d1b80c51f392654106 Gerrit-PatchSet: 2 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]>
