Let me take a stab at your questions – can you clarify some of the points 
below? I’m wondering if you’re using the streaming concepts as they were 
intended…

1. Windowed operations

First, I just want to confirm that it is your intention to split the original 
kafka stream into multiple Dstreams – and that grouping by key or 
repartitioning using your Map[Int, List[String]] is not enough.
I have never come across a situation where I needed to do this, so I’m 
wondering if what you need is a simple groupBy / reduceByKey or similar.

Putting that aside, your code below suggests that you’re “windowing” (is that a 
word? :)) the stream twice. You call window on the kafka stream and then you 
“reduce by key and window” the resulting stream.
Again, wondering if your intention is not simply to “reduce by key and window”.

Another thing – you’re computing your RDDs based on the slide duration of the 
window, but that is not correct. The number of RDDs is determined by the “batch 
interval”, which is constant across the streaming context (e.g. 10 seconds, 1 
minute, whatever). Both window duration and slide interval need to be multiples 
of this.

2. Checkpointing

First of all, you should call checkpoint on the streaming context 
ssc.checkpoint(checkpointDir) - where checkpoint dir needs to be a folder in 
local mode and HDFS in cluster mode.
This is probably where the error comes from.

Second – kafka is backed by durable storage so you don’t need to checkpoint 
it’s contents as it an always replay events in case of failure. You could do it 
if you go through the same data multiple times, as a performance enhancement, 
but you don’t have to.

Third – the windowed operation NEEDS to checkpoint data, as it’s stateful – all 
the stateful operations call persist internally as you’ve seen, to avoid 
recreating the full state from original events in case of failure. Doing this 
for a window of 1 hour could take way too long.

Last but not least – when you call checkpoint(interval) you can choose to 
checkpoint more often or less often than the default value. See the 
checkpointing 
docs<http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing>
 for more info:


For stateful transformations that require RDD checkpointing, the default 
interval is a multiple of the batch interval that is at least 10 seconds. It 
can be set by using dstream.checkpoint(checkpointInterval). Typically, a 
checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting 
to try.

Hope this helps,
-adrian

From: srungarapu vamsi
Date: Wednesday, September 23, 2015 at 10:51 PM
To: user
Subject: reduceByKeyAndWindow confusion

I create  a stream from kafka as belows"

val kafkaDStream  = 
KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,
 kafkaConf, Set(topics))
                              
.window(Minutes(WINDOW_DURATION),Minutes(SLIDER_DURATION))

I have a map ("intToStringList") which is a Map[Int,List[String]]
using this map i am filtering the stream and finally converting it into
Map[Int,DStream[KafkaGenericEvent]]]

1.
Now on this map, for each and every value (which is a 
DStream[KafkaGenericEvent])
i am applying reduceByKeyAndWindow operation.
But since we have to give window duration and slider duration even in 
reduceByKeyAndWindow, does that imply that on every window of the given 
DStream, reduceByKeyAndWindow can be applied with a different window duration 
and slider duration ?
i.e Lets say window DStream is created with window duration-> 16 minutes,
slider duration -> 1 Minute, so  i have one RDD for every window
For reduceByKeyAndWindow, if we have window duration as as 4 minutes and slider 
duration as 1 minute, then will i get 4 RDDs since the 
windowDStream_batchDuration / reduceByKeyAndwindow_batchDuration is 4 ?

2.
As suggested in spark doc, i am trying to give checkpointing interval on the 
kafkaDStream created in the block shown above in the following way:
kafkaDStream.checkpoint(Minutes(4))

But when i execute this, i get the error:
"WindowedDStream has been marked for checkpointing but the storage level has 
not been set to enable persisting. Please use DStream.persist() to set the 
storage level to use memory for better checkpointing performance"
But when i went through the implementation of checkpoint function  of 
DStream.scala, i see a call to persist() function.
Then do i really have to persist function in the WindowedDStream ?
Just to give a shot i made a call to persist method on the windowedDStream and 
then made a call to checkpoint(interval) . Even then i am facing the above 
mentioned error.
How do i solve this ?
--
/Vamsi

Reply via email to