Hi,

I'm quite new to Spark Streaming and developed the following
application to pass 4 strings, process them and shut down:

    val conf = new SparkConf(false) // skip loading external settings
      .setMaster("local[1]") // run locally with one thread
      .setAppName("Spark Streaming with Scala") // name in Spark web UI
    val ssc = new StreamingContext(conf, Seconds(5))
    val stream: ReceiverInputDStream[String] = ssc.receiverStream(
      new Receiver[String](StorageLevel.MEMORY_ONLY_SER_2) {
        def onStart() {
          println("[ACTIVATOR] onStart called")
          store("one")
          store("two")
          store("three")
          store("four")
          stop("No more data...receiver stopped")
        }

        def onStop() {
          println("[ACTIVATOR] onStop called")
        }
      }
    )
    stream.count().map(cnt => "Received " + cnt + " events.").print()

    ssc.start()
    // ssc.awaitTermination(1000)
    val stopSparkContext, stopGracefully = true
    ssc.stop(stopSparkContext, stopGracefully)

I'm running it with `xsbt 'runMain StreamingApp'` with xsbt and spark
build from the latest sources.

What I noticed is that the app generates:

14/05/18 22:32:55 INFO DAGScheduler: Completed ResultTask(1, 0)
14/05/18 22:32:55 INFO DAGScheduler: Stage 1 (take at
DStream.scala:593) finished in 0.245 s
14/05/18 22:32:55 INFO SparkContext: Job finished: take at
DStream.scala:593, took 4.829798 s
-------------------------------------------
Time: 1400445170000 ms
-------------------------------------------

14/05/18 22:32:55 INFO DAGScheduler: Completed ResultTask(3, 0)
14/05/18 22:32:55 INFO DAGScheduler: Stage 3 (take at
DStream.scala:593) finished in 0.022 s
14/05/18 22:32:55 INFO SparkContext: Job finished: take at
DStream.scala:593, took 0.194738 s
-------------------------------------------
Time: 1400445175000 ms
-------------------------------------------

14/05/18 22:33:00 INFO DAGScheduler: Completed ResultTask(5, 0)
14/05/18 22:33:00 INFO DAGScheduler: Stage 5 (take at
DStream.scala:593) finished in 0.014 s
14/05/18 22:33:00 INFO SparkContext: Job finished: take at
DStream.scala:593, took 0.319387 s
-------------------------------------------
Time: 1400445180000 ms
-------------------------------------------

Why are there three jobs finished? I would expect one since after
`store` the app immediately calls `stop`. Can I have a single job that
would process these 4 `store`s?

Jacek

-- 
Jacek Laskowski | http://blog.japila.pl
"Never discourage anyone who continually makes progress, no matter how
slow." Plato

Reply via email to