@Ashwin: You could append the topic in the data.

val kafkaStreams = topics.map { topic =>
    KafkaUtils.createDirectStream(....topic...).map { x => (x, topic) }
}
val unionedStream = context.union(kafkaStreams)


@Brandon:
I dont recommend it, but you could do something crazy like use the
foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD
waits for all the jobs to complete.

manyDStreams.foreach { dstream =>
   dstream1.foreachRDD { rdd =>
    // Add runnable that runs the job on RDD to threadpool
    // This does not wait for the job to finish
  }
}

anyOfTheManyDStreams.foreachRDD { _ =>
    // wait for all the current batch's jobs in the threadpool to complete.

}


This would run all the Spark jobs in the batch in parallel in thread pool,
but it would also make sure all the jobs finish before the batch is marked
as completed.

On Tue, Jul 28, 2015 at 4:05 PM, Brandon White <bwwintheho...@gmail.com>
wrote:

> Thank you Tathagata. My main use case for the 500 streams is to append new
> elements into their corresponding Spark SQL tables. Every stream is mapped
> to a table so I'd like to use the streams to appended the new rdds to the
> table. If I union all the streams, appending new elements becomes a
> nightmare. So there is no other way to parallelize something like the
> following? Will this still run sequence or timeout?
>
> //500 streams
> streams.foreach { stream =>
>   stream.foreachRDD { rdd =>
>     val df = sqlContext.jsonRDD(rdd)
>     df.saveAsTable(streamTuple._1, SaveMode.Append)
>
>   }
> }
>
> On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> I dont think any one has really run 500 text streams.
>> And parSequences do nothing out there, you are only parallelizing the
>> setup code which does not really compute anything. Also it setsup 500
>> foreachRDD operations that will get executed in each batch sequentially, so
>> does not make sense. The write way to parallelize this is union all the
>> streams.
>>
>> val streams = streamPaths.map { path =>
>>   ssc.textFileStream(path)
>> }
>> val unionedStream = streamingContext.union(streams)
>> unionedStream.foreachRDD { rdd =>
>>   // do something
>> }
>>
>> Then there is only one foreachRDD executed in every batch that will
>> process in parallel all the new files in each batch interval.
>> TD
>>
>>
>> On Tue, Jul 28, 2015 at 3:06 PM, Brandon White <bwwintheho...@gmail.com>
>> wrote:
>>
>>> val ssc = new StreamingContext(sc, Minutes(10))
>>>
>>> //500 textFile streams watching S3 directories
>>> val streams = streamPaths.par.map { path =>
>>>   ssc.textFileStream(path)
>>> }
>>>
>>> streams.par.foreach { stream =>
>>>   stream.foreachRDD { rdd =>
>>>     //do something
>>>   }
>>> }
>>>
>>> ssc.start()
>>>
>>> Would something like this scale? What would be the limiting factor to
>>> performance? What is the best way to parallelize this? Any other ideas on
>>> design?
>>>
>>
>>
>

Reply via email to