abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2806
Change subject: [NO ISSUE][RT] Abort tasks on local network failures
......................................................................
[NO ISSUE][RT] Abort tasks on local network failures
- user model changes: no
- storage format changes: no
- interface changes: yes
Add error code to IInputChannelMonitor.notifyFailure
Details:
- Previously, there was an assumption that all failures
reported to an IInputChannelMonitor come from a remote
task.
- This assumption is not always true and could lead
to jobs hanging.
- To fix this, we report an error code indicating whether
the failure is local or remote and if the failure is local
then we fail the local task and report the failure to cc.
Change-Id: I7ea5b9008383faaac7c563671242b03919090b35
---
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannelMonitor.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
M
hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
M
hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
M
hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
M
hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
M
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java
M
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
M
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
13 files changed, 47 insertions(+), 24 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/06/2806/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannelMonitor.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannelMonitor.java
index 52509d3..559f49a 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannelMonitor.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannelMonitor.java
@@ -19,9 +19,9 @@
package org.apache.hyracks.api.channels;
public interface IInputChannelMonitor {
- public void notifyFailure(IInputChannel channel);
+ void notifyFailure(IInputChannel channel, int errorCode);
- public void notifyDataAvailability(IInputChannel channel, int nFrames);
+ void notifyDataAvailability(IInputChannel channel, int nFrames);
- public void notifyEndOfStream(IInputChannel channel);
+ void notifyEndOfStream(IInputChannel channel);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index b6d7cc7..09193d9 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -148,6 +148,7 @@
public static final int CANNOT_ADD_ELEMENT_TO_INVERTED_INDEX_SEARCH_RESULT
= 112;
public static final int UNDEFINED_INVERTED_LIST_MERGE_TYPE = 113;
public static final int NODE_IS_NOT_ACTIVE = 114;
+ public static final int LOCAL_NETWORK_ERROR = 115;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
index 36c77ce..b1566f4 100644
---
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
+++
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
@@ -199,7 +199,7 @@
}
@Override
- public synchronized void notifyFailure(IInputChannel channel) {
+ public synchronized void notifyFailure(IInputChannel channel, int
errorCode) {
failed = true;
notifyAll();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
index 44c3d36..0f96a6e 100644
---
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
+++
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
@@ -141,7 +141,7 @@
@Override
public void error(int ecode) {
- monitor.notifyFailure(DatasetNetworkInputChannel.this);
+ monitor.notifyFailure(DatasetNetworkInputChannel.this, ecode);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
index a831492..7e893f8 100644
---
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
+++
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
@@ -133,7 +133,7 @@
@Override
public void error(int ecode) {
- monitor.notifyFailure(NetworkInputChannel.this);
+ monitor.notifyFailure(NetworkInputChannel.this, ecode);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index 3016a7a..8bee56e 100644
---
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -96,7 +96,7 @@
@Override
public void fail() throws HyracksDataException {
-
ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_WRITE_ERROR_CODE);
+
ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
}
@Override
@@ -105,7 +105,7 @@
}
public void abort() {
-
ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_WRITE_ERROR_CODE);
+
ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
synchronized (NetworkOutputChannel.this) {
aborted = true;
NetworkOutputChannel.this.notifyAll();
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 23a5abb..340924d 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -309,7 +309,8 @@
if (!addPendingThread(thread)) {
return;
}
- thread.setName(displayName + ":" +
taskAttemptId + ":" + cIdx);
+ thread.setName(
+ displayName + ":" + joblet.getJobId()
+ ":" + taskAttemptId + ":" + cIdx);
thread.setPriority(Thread.MIN_PRIORITY);
try {
pushFrames(collector,
inputChannelsFromConnectors.get(cIdx), writer);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
index c902ad8..5ffed7e 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -103,7 +103,7 @@
}
@Override
- public synchronized void notifyFailure(IInputChannel channel) {
+ public synchronized void notifyFailure(IInputChannel channel, int
errorCode) {
failed.set(true);
notifyAll();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
index bf9f575..745dfa9 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
@@ -25,8 +25,10 @@
import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameReader;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -38,28 +40,35 @@
private boolean eos;
- private boolean failed;
+ private int errorCode;
public InputChannelFrameReader(IInputChannel channel) {
this.channel = channel;
availableFrames = 0;
+ errorCode = AbstractChannelWriteInterface.NO_ERROR_CODE;
eos = false;
- failed = false;
}
@Override
public void open() throws HyracksDataException {
}
+ private boolean hasFailed() {
+ return errorCode != AbstractChannelWriteInterface.NO_ERROR_CODE;
+ }
+
private synchronized boolean canGetNextBuffer() throws
HyracksDataException {
- while (!failed && !eos && availableFrames <= 0) {
+ while (!hasFailed() && !eos && availableFrames <= 0) {
try {
wait();
} catch (InterruptedException e) {
throw HyracksDataException.create(e);
}
}
- if (failed) {
+ if (hasFailed()) {
+ if (errorCode == AbstractChannelWriteInterface.LOCAL_ERROR_CODE) {
+ throw
HyracksDataException.create(ErrorCode.LOCAL_NETWORK_ERROR);
+ }
// Do not throw exception here to allow the root cause exception
gets propagated to the master first.
// Return false to allow the nextFrame(...) call to be a non-op.
LOGGER.warn("Sender failed.. returning silently");
@@ -116,8 +125,11 @@
}
@Override
- public synchronized void notifyFailure(IInputChannel channel) {
- failed = true;
+ public synchronized void notifyFailure(IInputChannel channel, int
errorCode) {
+ // Note: if a remote failure overwrites the value of localFailure,
then we rely on
+ // the fact that the remote task will notify the cc of the failure.
+ // Otherwise, the local task must fail
+ this.errorCode = errorCode;
notifyAll();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
index 3c0a06b..5a1d5f8 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
@@ -23,8 +23,10 @@
import org.apache.hyracks.api.channels.IInputChannel;
import org.apache.hyracks.api.channels.IInputChannelMonitor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -46,6 +48,8 @@
private final BitSet closedSenders;
private int lastReadSender;
+
+ private boolean localFailure;
public NonDeterministicChannelReader(int nSenderPartitions, BitSet
expectedPartitions) {
this.nSenderPartitions = nSenderPartitions;
@@ -107,6 +111,9 @@
}
if (!failSenders.isEmpty()) {
LOGGER.warn("Sender failed.. returning silently");
+ if (localFailure) {
+ throw
HyracksDataException.create(ErrorCode.LOCAL_NETWORK_ERROR);
+ }
// Do not throw exception here to allow the root cause
exception gets propagated to the master first.
// Return a negative value to allow the nextFrame(...) call to
be a non-op.
return -1;
@@ -141,11 +148,15 @@
}
@Override
- public synchronized void notifyFailure(IInputChannel channel) {
+ public synchronized void notifyFailure(IInputChannel channel, int
errorCode) {
PartitionId pid = (PartitionId) channel.getAttachment();
int senderIndex = pid.getSenderIndex();
LOGGER.warn("Failure: " + pid.getConnectorDescriptorId() + " sender: "
+ senderIndex + " receiver: "
+ pid.getReceiverIndex());
+ // Note: if a remote failure overwrites the value of localFailure,
then we rely on
+ // the fact that the remote task will notify the cc of the failure.
+ // Otherwise, the local task must fail
+ localFailure = errorCode ==
AbstractChannelWriteInterface.LOCAL_ERROR_CODE;
failSenders.set(senderIndex);
eosSenders.set(senderIndex);
notifyAll();
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java
index ff8d451..31cb69f 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java
@@ -42,10 +42,6 @@
}
}
- public void reportError(int ecode) {
- fba.error(ecode);
- }
-
@Override
public void setFullBufferAcceptor(ICloseableBufferAcceptor
fullBufferAcceptor) {
fba = fullBufferAcceptor;
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
index 28c1a71..7ed9bfa 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -31,7 +31,9 @@
public abstract class AbstractChannelWriteInterface implements
IChannelWriteInterface {
- public static final int REMOTE_WRITE_ERROR_CODE = 1;
+ public static final int NO_ERROR_CODE = 0;
+ public static final int REMOTE_ERROR_CODE = 1;
+ public static final int LOCAL_ERROR_CODE = -1;
private static final Logger LOGGER = LogManager.getLogger();
protected final IChannelControlBlock ccb;
protected final Queue<ByteBuffer> wiFullQueue;
@@ -136,7 +138,7 @@
return;
}
eos = true;
- if (ecode != REMOTE_WRITE_ERROR_CODE) {
+ if (ecode != REMOTE_ERROR_CODE) {
adjustChannelWritability();
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
index bd42560..c805d78 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -200,7 +200,7 @@
for (int i = 0; i < ccbArray.length; ++i) {
ChannelControlBlock ccb = ccbArray[i];
if (ccb != null && !ccb.getRemoteEOS()) {
- ccb.reportRemoteError(-1);
+
ccb.reportRemoteError(AbstractChannelWriteInterface.LOCAL_ERROR_CODE);
markEOSAck(i);
unmarkPendingCredits(i);
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2806
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I7ea5b9008383faaac7c563671242b03919090b35
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>