You can enable this flag to run multiple jobs concurrently, It might not be production ready, but you can give it a try:
sc.set("spark.streaming.concurrentJobs","2") Refer to TD's answer here <http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming#answers-header> for more information. Thanks Best Regards On Wed, Apr 22, 2015 at 8:57 AM, Abhay Bansal <abhaybansal.1...@gmail.com> wrote: > Hi, > > I have use case wherein I have to join multiple kafka topics in parallel. > So if there are 2n topics there is a one to one mapping of topics which > needs to be joined. > > > val arr= ... > > for(condition) { > > val dStream1 = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc, kafkaParams, topics1 > ).map(a=>(getKey1(a._2),a._2)) > > val dStream2 = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc, kafkaParams, topics2 > ).map(a=>(getKey2(a._2),a._2)) > > arr(counter) = (dStream1, dStream2); > > counter+=1; > > } > > > > arr.par.foreach { > > case(dStream1, dStream2) => try { > > val joined = dStream1.join(dStream2,4); > > joined.saveAsTextFiles("joinedData”) > > } > > catch { > > case t:Exception =>t.printStackTrace(); > > } > > } > > > > ssc.start() > > ssc.awaitTermination() > > > Doing so the streams are getting joined by sequentially. Is there a way > out of this? I am new to spark, would appreciate any suggestions around > this. > > > Thanks, > > -Abhay >