@Ashwin you don't need to append the topic to your data if you're using the
direct stream.  You can get the topic from the offset range, see
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
 (search for "offsetRange")

If you're using the receiver based stream, you'll need to follow TD's
suggestion

On Tue, Jul 28, 2015 at 8:09 PM, Tathagata Das <t...@databricks.com> wrote:

> @Ashwin: You could append the topic in the data.
>
> val kafkaStreams = topics.map { topic =>
>     KafkaUtils.createDirectStream(....topic...).map { x => (x, topic) }
> }
> val unionedStream = context.union(kafkaStreams)
>
>
> @Brandon:
> I dont recommend it, but you could do something crazy like use the
> foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD
> waits for all the jobs to complete.
>
> manyDStreams.foreach { dstream =>
>    dstream1.foreachRDD { rdd =>
>     // Add runnable that runs the job on RDD to threadpool
>     // This does not wait for the job to finish
>   }
> }
>
> anyOfTheManyDStreams.foreachRDD { _ =>
>     // wait for all the current batch's jobs in the threadpool to
> complete.
>
> }
>
>
> This would run all the Spark jobs in the batch in parallel in thread pool,
> but it would also make sure all the jobs finish before the batch is marked
> as completed.
>
> On Tue, Jul 28, 2015 at 4:05 PM, Brandon White <bwwintheho...@gmail.com>
> wrote:
>
>> Thank you Tathagata. My main use case for the 500 streams is to append
>> new elements into their corresponding Spark SQL tables. Every stream is
>> mapped to a table so I'd like to use the streams to appended the new rdds
>> to the table. If I union all the streams, appending new elements becomes a
>> nightmare. So there is no other way to parallelize something like the
>> following? Will this still run sequence or timeout?
>>
>> //500 streams
>> streams.foreach { stream =>
>>   stream.foreachRDD { rdd =>
>>     val df = sqlContext.jsonRDD(rdd)
>>     df.saveAsTable(streamTuple._1, SaveMode.Append)
>>
>>   }
>> }
>>
>> On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das <t...@databricks.com>
>> wrote:
>>
>>> I dont think any one has really run 500 text streams.
>>> And parSequences do nothing out there, you are only parallelizing the
>>> setup code which does not really compute anything. Also it setsup 500
>>> foreachRDD operations that will get executed in each batch sequentially, so
>>> does not make sense. The write way to parallelize this is union all the
>>> streams.
>>>
>>> val streams = streamPaths.map { path =>
>>>   ssc.textFileStream(path)
>>> }
>>> val unionedStream = streamingContext.union(streams)
>>> unionedStream.foreachRDD { rdd =>
>>>   // do something
>>> }
>>>
>>> Then there is only one foreachRDD executed in every batch that will
>>> process in parallel all the new files in each batch interval.
>>> TD
>>>
>>>
>>> On Tue, Jul 28, 2015 at 3:06 PM, Brandon White <bwwintheho...@gmail.com>
>>> wrote:
>>>
>>>> val ssc = new StreamingContext(sc, Minutes(10))
>>>>
>>>> //500 textFile streams watching S3 directories
>>>> val streams = streamPaths.par.map { path =>
>>>>   ssc.textFileStream(path)
>>>> }
>>>>
>>>> streams.par.foreach { stream =>
>>>>   stream.foreachRDD { rdd =>
>>>>     //do something
>>>>   }
>>>> }
>>>>
>>>> ssc.start()
>>>>
>>>> Would something like this scale? What would be the limiting factor to
>>>> performance? What is the best way to parallelize this? Any other ideas on
>>>> design?
>>>>
>>>
>>>
>>
>

Reply via email to