Thanks a lot @Das @Cody. I moved from receiver based to direct stream and I can get the topics from the offset!!
On Fri, Jul 31, 2015 at 4:41 PM, Brandon White <bwwintheho...@gmail.com> wrote: > Tathagata, > > Could the bottleneck possibility be the number of executor nodes in our > cluster? Since we are creating 500 Dstreams based off 500 textfile > directories, do we need at least 500 executors / nodes to be receivers for > each one of the streams? > > On Tue, Jul 28, 2015 at 6:09 PM, Tathagata Das <t...@databricks.com> > wrote: > >> @Ashwin: You could append the topic in the data. >> >> val kafkaStreams = topics.map { topic => >> KafkaUtils.createDirectStream(....topic...).map { x => (x, topic) } >> } >> val unionedStream = context.union(kafkaStreams) >> >> >> @Brandon: >> I dont recommend it, but you could do something crazy like use the >> foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD >> waits for all the jobs to complete. >> >> manyDStreams.foreach { dstream => >> dstream1.foreachRDD { rdd => >> // Add runnable that runs the job on RDD to threadpool >> // This does not wait for the job to finish >> } >> } >> >> anyOfTheManyDStreams.foreachRDD { _ => >> // wait for all the current batch's jobs in the threadpool to >> complete. >> >> } >> >> >> This would run all the Spark jobs in the batch in parallel in thread >> pool, but it would also make sure all the jobs finish before the batch is >> marked as completed. >> >> On Tue, Jul 28, 2015 at 4:05 PM, Brandon White <bwwintheho...@gmail.com> >> wrote: >> >>> Thank you Tathagata. My main use case for the 500 streams is to append >>> new elements into their corresponding Spark SQL tables. Every stream is >>> mapped to a table so I'd like to use the streams to appended the new rdds >>> to the table. If I union all the streams, appending new elements becomes a >>> nightmare. So there is no other way to parallelize something like the >>> following? Will this still run sequence or timeout? >>> >>> //500 streams >>> streams.foreach { stream => >>> stream.foreachRDD { rdd => >>> val df = sqlContext.jsonRDD(rdd) >>> df.saveAsTable(streamTuple._1, SaveMode.Append) >>> >>> } >>> } >>> >>> On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das <t...@databricks.com> >>> wrote: >>> >>>> I dont think any one has really run 500 text streams. >>>> And parSequences do nothing out there, you are only parallelizing the >>>> setup code which does not really compute anything. Also it setsup 500 >>>> foreachRDD operations that will get executed in each batch sequentially, so >>>> does not make sense. The write way to parallelize this is union all the >>>> streams. >>>> >>>> val streams = streamPaths.map { path => >>>> ssc.textFileStream(path) >>>> } >>>> val unionedStream = streamingContext.union(streams) >>>> unionedStream.foreachRDD { rdd => >>>> // do something >>>> } >>>> >>>> Then there is only one foreachRDD executed in every batch that will >>>> process in parallel all the new files in each batch interval. >>>> TD >>>> >>>> >>>> On Tue, Jul 28, 2015 at 3:06 PM, Brandon White <bwwintheho...@gmail.com >>>> > wrote: >>>> >>>>> val ssc = new StreamingContext(sc, Minutes(10)) >>>>> >>>>> //500 textFile streams watching S3 directories >>>>> val streams = streamPaths.par.map { path => >>>>> ssc.textFileStream(path) >>>>> } >>>>> >>>>> streams.par.foreach { stream => >>>>> stream.foreachRDD { rdd => >>>>> //do something >>>>> } >>>>> } >>>>> >>>>> ssc.start() >>>>> >>>>> Would something like this scale? What would be the limiting factor to >>>>> performance? What is the best way to parallelize this? Any other ideas on >>>>> design? >>>>> >>>> >>>> >>> >> > -- Thanks & Regards, Ashwin Giridharan