Hi,
To test the resiliency of Kafka Spark streaming, I killed the worker
reading from Kafka Topic and noticed that the driver is unable to replace
the worker and the job becomes a rogue job that keeps running doing nothing
from that point on.
Is this a known issue? Are there any workarounds?
Here is the exception that I see on the driver:
2014-08-21 03:43:22,163 [spark-akka.actor.default-dispatcher-16] WARN
org.apache.spark.streaming.scheduler.ReceiverTracker - Error reported by
receiver for stream 0: Error in block pushing thread -
org.apache.spark.SparkException: Error sending message to
BlockManagerMaster [message =
org.apache.spark.storage.BlockManagerMessages$UpdateBlockInfo@5fc95796]
at
org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:251)
at
org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:68)
at org.apache.spark.storage.BlockManager.org
$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:283)
at
org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:259)
at
org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:866)
at
org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$4.apply(MemoryStore.scala:267)
at
org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$4.apply(MemoryStore.scala:256)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.storage.MemoryStore.ensureFreeSpace(MemoryStore.scala:256)
at
org.apache.spark.storage.MemoryStore.tryToPut(MemoryStore.scala:179)
at
org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:80)
at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:663)
at org.apache.spark.storage.BlockManager.put(BlockManager.scala:574)
at
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:113)
at
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl$$anon$2.onPushBlock(ReceiverSupervisorImpl.scala:96)
at
org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:139)
at org.apache.spark.streaming.receiver.BlockGenerator.org
$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:112)
at
org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:57)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[30 seconds]
at
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at
org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:237)
... 18 more
Thanks,
Bharat