Hi, I'm quite new to Spark Streaming and developed the following application to pass 4 strings, process them and shut down:
val conf = new SparkConf(false) // skip loading external settings .setMaster("local[1]") // run locally with one thread .setAppName("Spark Streaming with Scala") // name in Spark web UI val ssc = new StreamingContext(conf, Seconds(5)) val stream: ReceiverInputDStream[String] = ssc.receiverStream( new Receiver[String](StorageLevel.MEMORY_ONLY_SER_2) { def onStart() { println("[ACTIVATOR] onStart called") store("one") store("two") store("three") store("four") stop("No more data...receiver stopped") } def onStop() { println("[ACTIVATOR] onStop called") } } ) stream.count().map(cnt => "Received " + cnt + " events.").print() ssc.start() // ssc.awaitTermination(1000) val stopSparkContext, stopGracefully = true ssc.stop(stopSparkContext, stopGracefully) I'm running it with `xsbt 'runMain StreamingApp'` with xsbt and spark build from the latest sources. What I noticed is that the app generates: 14/05/18 22:32:55 INFO DAGScheduler: Completed ResultTask(1, 0) 14/05/18 22:32:55 INFO DAGScheduler: Stage 1 (take at DStream.scala:593) finished in 0.245 s 14/05/18 22:32:55 INFO SparkContext: Job finished: take at DStream.scala:593, took 4.829798 s ------------------------------------------- Time: 1400445170000 ms ------------------------------------------- 14/05/18 22:32:55 INFO DAGScheduler: Completed ResultTask(3, 0) 14/05/18 22:32:55 INFO DAGScheduler: Stage 3 (take at DStream.scala:593) finished in 0.022 s 14/05/18 22:32:55 INFO SparkContext: Job finished: take at DStream.scala:593, took 0.194738 s ------------------------------------------- Time: 1400445175000 ms ------------------------------------------- 14/05/18 22:33:00 INFO DAGScheduler: Completed ResultTask(5, 0) 14/05/18 22:33:00 INFO DAGScheduler: Stage 5 (take at DStream.scala:593) finished in 0.014 s 14/05/18 22:33:00 INFO SparkContext: Job finished: take at DStream.scala:593, took 0.319387 s ------------------------------------------- Time: 1400445180000 ms ------------------------------------------- Why are there three jobs finished? I would expect one since after `store` the app immediately calls `stop`. Can I have a single job that would process these 4 `store`s? Jacek -- Jacek Laskowski | http://blog.japila.pl "Never discourage anyone who continually makes progress, no matter how slow." Plato