Thanks for your suggestions. sc.set("spark.streaming.concurrentJobs","2") works, but I am not sure of using it in production.
@TD: The number of streams that we are interacting with are very large. Managing these many applications would just be an overhead. Moreover there are other operation which may be performed on the output of the joined operation. Thanks, -Abhay On Wed, Apr 22, 2015 at 11:31 PM, Tathagata Das <t...@databricks.com> wrote: > Furthermore, just to explain, doing arr.par.foreach does not help because > it not really running anything, it only doing setup of the computation. > Doing the setup in parallel does not mean that the jobs will be done > concurrently. > > Also, from your code it seems like your pairs of dstreams dont interact > with each other (that is pair1 dont interact with pair2). Then you could > run then in separate applications, which would allow them to run in > parallel. > > > On Tue, Apr 21, 2015 at 11:53 PM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> You can enable this flag to run multiple jobs concurrently, It might not >> be production ready, but you can give it a try: >> >> sc.set("spark.streaming.concurrentJobs","2") >> >> Refer to TD's answer here >> <http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming#answers-header> >> for more information. >> >> >> Thanks >> Best Regards >> >> On Wed, Apr 22, 2015 at 8:57 AM, Abhay Bansal <abhaybansal.1...@gmail.com >> > wrote: >> >>> Hi, >>> >>> I have use case wherein I have to join multiple kafka topics in >>> parallel. So if there are 2n topics there is a one to one mapping of topics >>> which needs to be joined. >>> >>> >>> val arr= ... >>> >>> for(condition) { >>> >>> val dStream1 = KafkaUtils.createDirectStream[String, String, >>> StringDecoder, StringDecoder](ssc, kafkaParams, topics1 >>> ).map(a=>(getKey1(a._2),a._2)) >>> >>> val dStream2 = KafkaUtils.createDirectStream[String, String, >>> StringDecoder, StringDecoder](ssc, kafkaParams, topics2 >>> ).map(a=>(getKey2(a._2),a._2)) >>> >>> arr(counter) = (dStream1, dStream2); >>> >>> counter+=1; >>> >>> } >>> >>> >>> >>> arr.par.foreach { >>> >>> case(dStream1, dStream2) => try { >>> >>> val joined = dStream1.join(dStream2,4); >>> >>> joined.saveAsTextFiles("joinedData”) >>> >>> } >>> >>> catch { >>> >>> case t:Exception =>t.printStackTrace(); >>> >>> } >>> >>> } >>> >>> >>> >>> ssc.start() >>> >>> ssc.awaitTermination() >>> >>> >>> Doing so the streams are getting joined by sequentially. Is there a way >>> out of this? I am new to spark, would appreciate any suggestions around >>> this. >>> >>> >>> Thanks, >>> >>> -Abhay >>> >> >> >