You can enable this flag to run multiple jobs concurrently, It might not be
production ready, but you can give it a try:

sc.set("spark.streaming.concurrentJobs","2")

​Refer to TD's answer here
<http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming#answers-header>
for more information.​


Thanks
Best Regards

On Wed, Apr 22, 2015 at 8:57 AM, Abhay Bansal <abhaybansal.1...@gmail.com>
wrote:

> Hi,
>
> I have use case wherein I have to join multiple kafka topics in parallel.
> So if there are 2n topics there is a one to one mapping of topics which
> needs to be joined.
>
>
>     val arr= ...
>
>     for(condition) {
>
>         val dStream1 = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topics1
> ).map(a=>(getKey1(a._2),a._2))
>
>         val dStream2 = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topics2
> ).map(a=>(getKey2(a._2),a._2))
>
>        arr(counter) = (dStream1, dStream2);
>
>        counter+=1;
>
>     }
>
>
>
>     arr.par.foreach {
>
>             case(dStream1, dStream2) => try {
>
>                 val joined = dStream1.join(dStream2,4);
>
>                 joined.saveAsTextFiles("joinedData”)
>
>             }
>
>             catch {
>
>                 case t:Exception =>t.printStackTrace();
>
>             }
>
>         }
>
>
>
>     ssc.start()
>
>     ssc.awaitTermination()
>
>
> Doing so the streams are getting joined by sequentially. Is there a way
> out of this? I am new to spark, would appreciate any suggestions around
> this.
>
>
> Thanks,
>
> -Abhay
>

Reply via email to