Tathagata,
Could the bottleneck possibility be the number of executor nodes in our
cluster? Since we are creating 500 Dstreams based off 500 textfile
directories, do we need at least 500 executors / nodes to be receivers for
each one of the streams?
On Tue, Jul 28, 2015 at 6:09 PM, Tathagata Das
Thanks a lot @Das @Cody. I moved from receiver based to direct stream and I
can get the topics from the offset!!
On Fri, Jul 31, 2015 at 4:41 PM, Brandon White bwwintheho...@gmail.com
wrote:
Tathagata,
Could the bottleneck possibility be the number of executor nodes in our
cluster? Since we
@Brandon, the file streams do not use receivers, so the bottleneck is not
about executors per se. But there could be couple of bottlenecks
1. Every batch interval, the 500 dstreams are going to get directory
listing from 500 directories, SEQUENTIALLY. So preparing the batch's RDDs
and jobs can
@Ashwin you don't need to append the topic to your data if you're using the
direct stream. You can get the topic from the offset range, see
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
(search for offsetRange)
If you're using the receiver based stream, you'll need to
@Das, Is there anyway to identify a kafka topic when we have unified
stream? As of now, for each topic I create dedicated DStream and use
foreachRDD on each of these Streams. If I have say 100 kafka topics, then
how can I use unified stream and still take topic specific actions inside
foreachRDD ?
Thank you Tathagata. My main use case for the 500 streams is to append new
elements into their corresponding Spark SQL tables. Every stream is mapped
to a table so I'd like to use the streams to appended the new rdds to the
table. If I union all the streams, appending new elements becomes a
@Ashwin: You could append the topic in the data.
val kafkaStreams = topics.map { topic =
KafkaUtils.createDirectStream(topic...).map { x = (x, topic) }
}
val unionedStream = context.union(kafkaStreams)
@Brandon:
I dont recommend it, but you could do something crazy like use the
I dont think any one has really run 500 text streams.
And parSequences do nothing out there, you are only parallelizing the setup
code which does not really compute anything. Also it setsup 500 foreachRDD
operations that will get executed in each batch sequentially, so does not
make sense. The
val ssc = new StreamingContext(sc, Minutes(10))
//500 textFile streams watching S3 directories
val streams = streamPaths.par.map { path =
ssc.textFileStream(path)
}
streams.par.foreach { stream =
stream.foreachRDD { rdd =
//do something
}
}
ssc.start()
Would something like this