[
https://issues.apache.org/jira/browse/SPARK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14068814#comment-14068814
]
Kousuke Saruta edited comment on SPARK-2583 at 7/21/14 5:46 PM:
Hi [~pwendell],
When I simulate disk fault on shuffle, I saw following behavior.
I simulated disk fault on shuffle.
I deleted bucket file on remote executor.
Then, Remote executor threw FileNotFoundException.
But fetching executor still wait.
Maybe waiting on future.onSuccess in BasicBlockFetcherIterator#sendRequest.
I think, BasicBlockFetcherIterator expects ack message rom ConnectionManager
through BlockManagerWorker but ConnectionManager#handleMesasge doesn't reply
back when FileNotFoundException (maybe also some other exception) is thrown.
It was because BlockManagerWorker#onBlockMessageReceive return None even if
error has occurred.
Considering this case, I think following 3 improvement is needed.
1) BlockManagerWorker#onBlockMessageReceive should not return None when error
has occurred. I think, None means no message but error is the message which
aught to be reply back
2) ConnectionManager should not reply empty message when some error has
occurred.
3) When onReceiveCallback throws Exception, ConnectionManager should replay
back that error has occurred. In this case, onReceiveCallback doesn't throw
exception but return None, but I think any other callback functions may throw
exceptions.
was (Author: sarutak):
Hi [~pwendell],
When I simulate disk fault on shuffle, I saw following 2 behaviors.
#1 is related to this topic and #2 is about this topic.
1) Fetching from executor locally itself
To simulate a case of disk fault, I deleted bucket file.
FileNotFoundException was thrown and
after retry,
2)
> ConnectionManager cannot distinguish whether error occurred or not
> --
>
> Key: SPARK-2583
> URL: https://issues.apache.org/jira/browse/SPARK-2583
> Project: Spark
> Issue Type: Bug
>Reporter: Kousuke Saruta
>Assignee: Kousuke Saruta
>Priority: Critical
>
> ConnectionManager#handleMessage sent empty messages to another peer if some
> error occurred or not in onReceiveCalback.
> {code}
> val ackMessage = if (onReceiveCallback != null) {
> logDebug("Calling back")
> onReceiveCallback(bufferMessage, connectionManagerId)
> } else {
> logDebug("Not calling back as callback is null")
> None
> }
> if (ackMessage.isDefined) {
> if (!ackMessage.get.isInstanceOf[BufferMessage]) {
> logDebug("Response to " + bufferMessage + " is not a buffer
> message, it is of type "
> + ackMessage.get.getClass)
> } else if (!ackMessage.get.asInstanceOf[BufferMessage].hasAckId) {
> logDebug("Response to " + bufferMessage + " does not have ack
> id set")
> ackMessage.get.asInstanceOf[BufferMessage].ackId =
> bufferMessage.id
> }
> }
> // We have no way to tell peer whether error occurred or not
> sendMessage(connectionManagerId, ackMessage.getOrElse {
> Message.createBufferMessage(bufferMessage.id)
> })
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)