Repository: spark Updated Branches: refs/heads/master 8c21170de -> dd242bad3
[SPARK-21525][STREAMING] Check error code from supervisor RPC. The code was ignoring the error code from the AddBlock RPC, which means that a failure to write to the WAL was being ignored by the receiver, and would lead to the block being acked (in the case of the Flume receiver) and data potentially lost. Author: Marcelo Vanzin <van...@cloudera.com> Closes #20161 from vanzin/SPARK-21525. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd242bad Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd242bad Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd242bad Branch: refs/heads/master Commit: dd242bad39cc6df7ff6c6b16642bdc92dccca6ac Parents: 8c21170 Author: Marcelo Vanzin <van...@cloudera.com> Authored: Wed Jan 31 11:48:19 2018 -0800 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Wed Jan 31 11:48:19 2018 -0800 ---------------------------------------------------------------------- .../apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala | 4 +++- .../org/apache/spark/streaming/scheduler/ReceiverTracker.scala | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/dd242bad/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 27644a6..5d38c56 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -159,7 +159,9 @@ private[streaming] class ReceiverSupervisorImpl( logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) - trackerEndpoint.askSync[Boolean](AddBlock(blockInfo)) + if (!trackerEndpoint.askSync[Boolean](AddBlock(blockInfo))) { + throw new SparkException("Failed to add block to receiver tracker.") + } logDebug(s"Reported block $blockId") } http://git-wip-us.apache.org/repos/asf/spark/blob/dd242bad/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 6f130c8..c74ca19 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -521,7 +521,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false if (active) { context.reply(addBlock(receivedBlockInfo)) } else { - throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.") + context.sendFailure( + new IllegalStateException("ReceiverTracker RpcEndpoint already shut down.")) } } }) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org