We have a Spark Streaming application that has basically zero scheduling
delay for hours, but then suddenly it jumps up to multiple minutes and
spirals out of control (see screenshot of job manager here:
http://i.stack.imgur.com/kSftN.png)
This is happens after a while even if we double the batch
On Fri, Feb 13, 2015 at 2:21 AM, Gerard Maas gerard.m...@gmail.com wrote:
KafkaOutputServicePool
Could you please give an example code of how KafkaOutputServicePool would
look like? When I tried object pooling I end up with various not
serializable exceptions.
Thanks!
Josh
1. Can you try count()? Take often does not force the entire computation.
2. Can you give the full log. From the log it seems that the blocks are
added to two nodes but the tasks seem to be launched to different nodes. I
dont see any message removing the blocks. So need the whole log to debug
Hi Tim,
I think this code will still introduce shuffle even when you call
repartition on each input stream. Actually this style of implementation
will generate more jobs (job per each input stream) than union into one
stream as called DStream.union(), and union normally has no special
overhead as
Hi Gerard,
Great write-up and really good guidance in there.
I have to be honest, I don't know why but setting # of partitions for each
dStream to a low number (5-10) just causes the app to choke/crash. Setting
it to 20 gets the app going but with not so great delays. Bump it up to 30
and I
Hey Tim,
Let me get the key points.
1. If you are not writing back to Kafka, the delay is stable? That is,
instead of foreachRDD { // write to kafka } if you do dstream.count,
then the delay is stable. Right?
2. If so, then Kafka is the bottleneck. Is the number of partitions, that
you spoke of
1) Yes, if I disable writing out to kafka and replace it with some very
light weight action is rdd.take(1), the app is stable.
2) The partitions I spoke of in the previous mail are the number of
partitions I create from each dStream. But yes, since I do processing and
writing out, per partition,
outdata.foreachRDD( rdd = rdd.foreachPartition(rec = {
val writer = new
KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap)
writer.output(rec)
}) )
So this is creating a new kafka producer for every new
I replaced the writeToKafka statements with a rdd.count() and sure enough,
I have a stable app with total delay well within the batch window (20
seconds). Here's the total delay lines from the driver log:
15/02/13 06:14:26 INFO JobScheduler: Total delay: 6.521 s for time
142380806 ms
Yes, you can try it. For example, if you have a cluster of 10 executors, 60
Kafka partitions, you can try to choose 10 receivers * 2 consumer threads,
so each thread will consume 3 partitions ideally, if you increase the
threads to 6, each threads will consume 1 partitions ideally. What I think
Hi Tim,
I think maybe you can try this way:
create Receiver per executor and specify thread for each topic large than
1, and the total number of consumer thread will be: total consumer =
(receiver number) * (thread number), and make sure this total consumer is
less than or equal to Kafka
Hi Saisai,
If I understand correctly, you are suggesting that control parallelism by
having number of consumers/executors at least 1:1 for number of kafka
partitions. For example, if I have 50 partitions for a kafka topic then
either have:
- 25 or more executors, 25 receivers, each receiver set
Just read the thread Are these numbers abnormal for spark streaming? and
I think I am seeing similar results - that is - increasing the window seems
to be the trick here. I will have to monitor for a few hours/days before I
can conclude (there are so many knobs/dials).
On Wed, Feb 11, 2015 at
On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
streaming app that consumes data from Kafka and writes it back to Kafka
(different topic). My big problem has been Total Delay. While execution
time is usually window size (in seconds), the total delay ranges from a
minutes to
14 matches
Mail list logo