Furthermore, just to explain, doing arr.par.foreach does not help because
it not really running anything, it only doing setup of the computation.
Doing the setup in parallel does not mean that the jobs will be done
concurrently.

Also, from your code it seems like your pairs of dstreams dont interact
with each other (that is pair1 dont interact with pair2). Then you could
run then in separate applications, which would allow them to run in
parallel.


On Tue, Apr 21, 2015 at 11:53 PM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> You can enable this flag to run multiple jobs concurrently, It might not
> be production ready, but you can give it a try:
>
> sc.set("spark.streaming.concurrentJobs","2")
>
> ​Refer to TD's answer here
> <http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming#answers-header>
> for more information.​
>
>
> Thanks
> Best Regards
>
> On Wed, Apr 22, 2015 at 8:57 AM, Abhay Bansal <abhaybansal.1...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have use case wherein I have to join multiple kafka topics in parallel.
>> So if there are 2n topics there is a one to one mapping of topics which
>> needs to be joined.
>>
>>
>>     val arr= ...
>>
>>     for(condition) {
>>
>>         val dStream1 = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topics1
>> ).map(a=>(getKey1(a._2),a._2))
>>
>>         val dStream2 = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topics2
>> ).map(a=>(getKey2(a._2),a._2))
>>
>>        arr(counter) = (dStream1, dStream2);
>>
>>        counter+=1;
>>
>>     }
>>
>>
>>
>>     arr.par.foreach {
>>
>>             case(dStream1, dStream2) => try {
>>
>>                 val joined = dStream1.join(dStream2,4);
>>
>>                 joined.saveAsTextFiles("joinedData”)
>>
>>             }
>>
>>             catch {
>>
>>                 case t:Exception =>t.printStackTrace();
>>
>>             }
>>
>>         }
>>
>>
>>
>>     ssc.start()
>>
>>     ssc.awaitTermination()
>>
>>
>> Doing so the streams are getting joined by sequentially. Is there a way
>> out of this? I am new to spark, would appreciate any suggestions around
>> this.
>>
>>
>> Thanks,
>>
>> -Abhay
>>
>
>

Reply via email to