Let me take a stab at your questions – can you clarify some of the points below? I’m wondering if you’re using the streaming concepts as they were intended…
1. Windowed operations First, I just want to confirm that it is your intention to split the original kafka stream into multiple Dstreams – and that grouping by key or repartitioning using your Map[Int, List[String]] is not enough. I have never come across a situation where I needed to do this, so I’m wondering if what you need is a simple groupBy / reduceByKey or similar. Putting that aside, your code below suggests that you’re “windowing” (is that a word? :)) the stream twice. You call window on the kafka stream and then you “reduce by key and window” the resulting stream. Again, wondering if your intention is not simply to “reduce by key and window”. Another thing – you’re computing your RDDs based on the slide duration of the window, but that is not correct. The number of RDDs is determined by the “batch interval”, which is constant across the streaming context (e.g. 10 seconds, 1 minute, whatever). Both window duration and slide interval need to be multiples of this. 2. Checkpointing First of all, you should call checkpoint on the streaming context ssc.checkpoint(checkpointDir) - where checkpoint dir needs to be a folder in local mode and HDFS in cluster mode. This is probably where the error comes from. Second – kafka is backed by durable storage so you don’t need to checkpoint it’s contents as it an always replay events in case of failure. You could do it if you go through the same data multiple times, as a performance enhancement, but you don’t have to. Third – the windowed operation NEEDS to checkpoint data, as it’s stateful – all the stateful operations call persist internally as you’ve seen, to avoid recreating the full state from original events in case of failure. Doing this for a window of 1 hour could take way too long. Last but not least – when you call checkpoint(interval) you can choose to checkpoint more often or less often than the default value. See the checkpointing docs<http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing> for more info: For stateful transformations that require RDD checkpointing, the default interval is a multiple of the batch interval that is at least 10 seconds. It can be set by using dstream.checkpoint(checkpointInterval). Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting to try. Hope this helps, -adrian From: srungarapu vamsi Date: Wednesday, September 23, 2015 at 10:51 PM To: user Subject: reduceByKeyAndWindow confusion I create a stream from kafka as belows" val kafkaDStream = KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc, kafkaConf, Set(topics)) .window(Minutes(WINDOW_DURATION),Minutes(SLIDER_DURATION)) I have a map ("intToStringList") which is a Map[Int,List[String]] using this map i am filtering the stream and finally converting it into Map[Int,DStream[KafkaGenericEvent]]] 1. Now on this map, for each and every value (which is a DStream[KafkaGenericEvent]) i am applying reduceByKeyAndWindow operation. But since we have to give window duration and slider duration even in reduceByKeyAndWindow, does that imply that on every window of the given DStream, reduceByKeyAndWindow can be applied with a different window duration and slider duration ? i.e Lets say window DStream is created with window duration-> 16 minutes, slider duration -> 1 Minute, so i have one RDD for every window For reduceByKeyAndWindow, if we have window duration as as 4 minutes and slider duration as 1 minute, then will i get 4 RDDs since the windowDStream_batchDuration / reduceByKeyAndwindow_batchDuration is 4 ? 2. As suggested in spark doc, i am trying to give checkpointing interval on the kafkaDStream created in the block shown above in the following way: kafkaDStream.checkpoint(Minutes(4)) But when i execute this, i get the error: "WindowedDStream has been marked for checkpointing but the storage level has not been set to enable persisting. Please use DStream.persist() to set the storage level to use memory for better checkpointing performance" But when i went through the implementation of checkpoint function of DStream.scala, i see a call to persist() function. Then do i really have to persist function in the WindowedDStream ? Just to give a shot i made a call to persist method on the windowedDStream and then made a call to checkpoint(interval) . Even then i am facing the above mentioned error. How do i solve this ? -- /Vamsi