@Ashwin you don't need to append the topic to your data if you're using the direct stream. You can get the topic from the offset range, see http://spark.apache.org/docs/latest/streaming-kafka-integration.html (search for "offsetRange")
If you're using the receiver based stream, you'll need to follow TD's suggestion On Tue, Jul 28, 2015 at 8:09 PM, Tathagata Das <t...@databricks.com> wrote: > @Ashwin: You could append the topic in the data. > > val kafkaStreams = topics.map { topic => > KafkaUtils.createDirectStream(....topic...).map { x => (x, topic) } > } > val unionedStream = context.union(kafkaStreams) > > > @Brandon: > I dont recommend it, but you could do something crazy like use the > foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD > waits for all the jobs to complete. > > manyDStreams.foreach { dstream => > dstream1.foreachRDD { rdd => > // Add runnable that runs the job on RDD to threadpool > // This does not wait for the job to finish > } > } > > anyOfTheManyDStreams.foreachRDD { _ => > // wait for all the current batch's jobs in the threadpool to > complete. > > } > > > This would run all the Spark jobs in the batch in parallel in thread pool, > but it would also make sure all the jobs finish before the batch is marked > as completed. > > On Tue, Jul 28, 2015 at 4:05 PM, Brandon White <bwwintheho...@gmail.com> > wrote: > >> Thank you Tathagata. My main use case for the 500 streams is to append >> new elements into their corresponding Spark SQL tables. Every stream is >> mapped to a table so I'd like to use the streams to appended the new rdds >> to the table. If I union all the streams, appending new elements becomes a >> nightmare. So there is no other way to parallelize something like the >> following? Will this still run sequence or timeout? >> >> //500 streams >> streams.foreach { stream => >> stream.foreachRDD { rdd => >> val df = sqlContext.jsonRDD(rdd) >> df.saveAsTable(streamTuple._1, SaveMode.Append) >> >> } >> } >> >> On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das <t...@databricks.com> >> wrote: >> >>> I dont think any one has really run 500 text streams. >>> And parSequences do nothing out there, you are only parallelizing the >>> setup code which does not really compute anything. Also it setsup 500 >>> foreachRDD operations that will get executed in each batch sequentially, so >>> does not make sense. The write way to parallelize this is union all the >>> streams. >>> >>> val streams = streamPaths.map { path => >>> ssc.textFileStream(path) >>> } >>> val unionedStream = streamingContext.union(streams) >>> unionedStream.foreachRDD { rdd => >>> // do something >>> } >>> >>> Then there is only one foreachRDD executed in every batch that will >>> process in parallel all the new files in each batch interval. >>> TD >>> >>> >>> On Tue, Jul 28, 2015 at 3:06 PM, Brandon White <bwwintheho...@gmail.com> >>> wrote: >>> >>>> val ssc = new StreamingContext(sc, Minutes(10)) >>>> >>>> //500 textFile streams watching S3 directories >>>> val streams = streamPaths.par.map { path => >>>> ssc.textFileStream(path) >>>> } >>>> >>>> streams.par.foreach { stream => >>>> stream.foreachRDD { rdd => >>>> //do something >>>> } >>>> } >>>> >>>> ssc.start() >>>> >>>> Would something like this scale? What would be the limiting factor to >>>> performance? What is the best way to parallelize this? Any other ideas on >>>> design? >>>> >>> >>> >> >