Tathagata:
I don't believe Kafka streams are being shut down cleanly, which implies that the most recent Kafka offsets are not being committed back to Zookeeper, which implies starting/restarting a Spark Streaming process would result in duplicate events. The simple Spark Streaming code (running in local mode) pasted below at the end of this e-mail, which uses a hard-coded queueStream as its only input stream, exits cleanly when the presence of the sentinel file is detected. However, if the queueStream is replaced with a kafkaStream, the process never exits (unless I put a System.exit() as the very last line -- to forcibly kill all threads). In attempting to understand the Kafka shutdown process, I traced through the Spark Streaming codebase with println()s. I noticed the following: 1. Although KafkaInputDStream.scala initializes the class member variable consumerConnector in onStart(), I don't see a corresponding consumerConnector.shutdown() anywhere such as in the onStop(). It is my understanding that it is the consumer shutdown() that commits the offsets back to Zookeeper. See the Kafka example at https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example#ConsumerGroupExample-FullSourceCode 2. There is a similar apparent asymmetry with executorPool, where it is not released in the onStop(). (A further minor encumbrance is that it is a variable local to onStart() rather than being a class member variable) 3. Through my println() tracing and Akka debug-level logging, I'm not seeing NetworkReceiverActor ever receiving a StopReceiver message from ReceiverExecutor. From some poking around and testing, it seems possible to successfully send any type of message to NetworkReceiverActor only prior to that NetowrkReceiverActor being serialized into an RDD on line 146 of NetworkInputTracker.scala https://github.com/mesos/spark/blob/branch-0.8/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala#L146 Prior to the actor being put into the RDD, messages can be sent to the actor, but not after the actor is put into the RDD. Is it possible that Akka actors are intolerant of being serialized? 4. I noticed a lot of "TODO" comments sprinkled throughout the code relating to shutdown/termination/cleanup. My biggest concern is #3 above, because if my suppositions are correct, then there might be some major re-architecting involved. The other issues I could probably fix on my own and commit back. import spark.streaming._ object SimpleSparkStreaming { @volatile var receivedStop = false val sentinelFile = new java.io.File("/home/mmalak/stop") def main(args: Array[String]) { val Array(master, zkQuorum, broker, group, topics, numThreads) = args sentinelFile.delete val ssc = new StreamingContext(master, "SimpleBeta", Seconds(1), System.getenv("SPARK_HOME"), Seq("./target/scala-2.9.2/my.jar")) ssc.checkpoint("/home/mmalak/checkpointing") ssc.queueStream(new scala.collection.mutable.Queue[spark.RDD[Int]] += ssc.sparkContext.makeRDD(List(1))).foreach(rdd => println("receivedStop[" + receivedStop + "]")) ssc.start() while (!sentinelFile.exists) {Thread.sleep(1000)} println("Stop detected") receivedStop = true ssc.stop() println("Exiting main") } }