Hi Hannes,

Good to know I'm not alone on the boat. Sorry about not posting back, I
haven't gone in a while onto the user list. 

It's on my agenda to get over this issue. Will be very important for our
recovery implementation. I have done an internal proof of concept but
without any conclusions so far. 

The main approach is to have full control over offsets, meaning upon each
processed batch we will need to persist the last processed event (I'm using
Kafka btw) and keep the offset somewhere, so that upon recovery we only
start the streaming from the last processed one. This kind of goes in
conflict with the new ReliableReceiver implementation, where that control is
taken away from the processing layer... 
When recovering Spark Streaming, we need to control the recovered batches so
that only internal state gets updated and no IO gets executed. For this we
need to make internal changes to Spark Streaming

I exposed a function that identifies how many batches are being recovered.
Then I passed that info upfront to the workers, and with a counter they are
aware of how many batches were recomputed, thus avoiding IO re-execution.
This is very much in embryo stage so I can't actually help you much at this
stage...
This is the function I've created inside JobGenerator class to access the
recovered batches:

def getDownTimes() : Seq[Time] =
  {
    println("123")
    if (ssc.isCheckpointPresent) {
      val batchDuration = ssc.graph.batchDuration

      // Batches when the master was down, that is,
      // between the checkpoint and current restart time
      val checkpointTime = ssc.initialCheckpoint.checkpointTime
      val restartTime = new
Time(timer.getRestartTime(graph.zeroTime.milliseconds))
      val downTimes = checkpointTime.until(restartTime, batchDuration)
      logInfo("Batches during down time (" + downTimes.size + " batches): "
        + downTimes.mkString(", "))

      downTimes
    }
    else
      Seq[Time]()
  }

Has been a while since I last visited this issue so I'm probably not able to
give you too many details right now, but I expect to have a concrete
solution on which ultimately I could push as proposal to the Spark dev team.

I will definitely notify people on this thread at least.

Tnks,
Rod




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p21265.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to