Tathagata:

I don't believe Kafka streams are being shut down cleanly, which implies that 
the most recent Kafka offsets are not being committed back to Zookeeper, which 
implies starting/restarting a Spark Streaming process would result in duplicate 
events.

The simple Spark Streaming code (running in local mode) pasted below at the end 
of this e-mail, which uses a hard-coded queueStream as its only input stream, 
exits cleanly when the presence of the sentinel file is detected. However, if 
the queueStream is replaced with a kafkaStream, the process never exits (unless 
I put a System.exit() as the very last line -- to forcibly kill all threads).

In attempting to understand the Kafka shutdown process, I traced through the 
Spark Streaming codebase with println()s. I noticed the following:

1. Although KafkaInputDStream.scala initializes the class member variable 
consumerConnector in onStart(), I don't see a corresponding 
consumerConnector.shutdown() anywhere such as in the onStop(). It is my 
understanding that it is the consumer shutdown() that commits the offsets back 
to Zookeeper. See the Kafka example at 
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example#ConsumerGroupExample-FullSourceCode

2. There is a similar apparent asymmetry with executorPool, where it is not 
released in the onStop(). (A further minor encumbrance is that it is a variable 
local to onStart() rather than being a class member variable)

3. Through my println() tracing and Akka debug-level logging, I'm not seeing 
NetworkReceiverActor ever receiving a StopReceiver message from 
ReceiverExecutor. From some poking around and testing, it seems possible to 
successfully send any type of message to NetworkReceiverActor only prior to 
that NetowrkReceiverActor being serialized into an RDD on line 146 of 
NetworkInputTracker.scala
https://github.com/mesos/spark/blob/branch-0.8/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala#L146

Prior to the actor being put into the RDD, messages can be sent to the actor, 
but not after the actor is put into the RDD. Is it possible that Akka actors 
are intolerant of being serialized?

4. I noticed a lot of "TODO" comments sprinkled throughout the code relating to 
shutdown/termination/cleanup.

My biggest concern is #3 above, because if my suppositions are correct, then 
there might be some major re-architecting involved. The other issues I could 
probably fix on my own and commit back.


import spark.streaming._

object SimpleSparkStreaming {
  @volatile var receivedStop = false
  val sentinelFile = new java.io.File("/home/mmalak/stop")

  def main(args: Array[String]) {
    val Array(master, zkQuorum, broker, group, topics, numThreads) = args

    sentinelFile.delete

    val ssc =  new StreamingContext(master, "SimpleBeta", Seconds(1), 
System.getenv("SPARK_HOME"), Seq("./target/scala-2.9.2/my.jar"))
    ssc.checkpoint("/home/mmalak/checkpointing")

    ssc.queueStream(new scala.collection.mutable.Queue[spark.RDD[Int]] += 
ssc.sparkContext.makeRDD(List(1))).foreach(rdd =>
      println("receivedStop[" + receivedStop + "]"))
    ssc.start()

    while (!sentinelFile.exists) {Thread.sleep(1000)}
    println("Stop detected")
    receivedStop = true
    ssc.stop()
    println("Exiting main")
  }
}

Reply via email to