Please remove query1.awaitTermination(); query2.awaitTermination();
once query1.awaitTermination(); is called, you don't even get to query2.awaitTermination(). On Tue, Sep 19, 2017 at 11:59 PM, kant kodali <kanth...@gmail.com> wrote: > Hi Burak, > > Thanks much! had no clue that existed. Now, I changed it to this. > > StreamingQuery query1 = > outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello1")).start(); > StreamingQuery query2 = > outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello2")).start(); > > query1.awaitTermination(); > query2.awaitTermination(); > sparkSession.streams().awaitAnyTermination(); > > > > > > On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz <brk...@gmail.com> wrote: > >> Hey Kant, >> >> That won't work either. Your second query may fail, and as long as your >> first query is running, you will not know. Put this as the last line >> instead: >> >> spark.streams.awaitAnyTermination() >> >> On Tue, Sep 19, 2017 at 10:11 PM, kant kodali <kanth...@gmail.com> wrote: >> >>> Looks like my problem was the order of awaitTermination() for some >>> reason. >>> >>> *Doesn't work * >>> >>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello1")).start().awaitTermination() >>> >>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello2")).start().awaitTermination() >>> >>> *Works* >>> >>> StreamingQuery query1 = outputDS1.writeStream().trigge >>> r(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello1")).start(); >>> >>> query1.awaitTermination() >>> >>> StreamingQuery query2 =outputDS2.writeStream().trigg >>> er(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello2")).start(); >>> >>> query2.awaitTermination() >>> >>> >>> >>> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali <kanth...@gmail.com> >>> wrote: >>> >>>> Looks like my problem was the order of awaitTermination() for some >>>> reason. >>>> >>>> Doesn't work >>>> >>>> >>>> >>>> >>>> >>>> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali <kanth...@gmail.com> >>>> wrote: >>>> >>>>> Hi All, >>>>> >>>>> I have the following Psuedo code (I could paste the real code however >>>>> it is pretty long and involves Database calls inside dataset.map operation >>>>> and so on) so I am just trying to simplify my question. would like to know >>>>> if there is something wrong with the following pseudo code? >>>>> >>>>> DataSet<String> inputDS = readFromKaka(topicName) >>>>> >>>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works >>>>> Since I can see data getting populated >>>>> >>>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Works as >>>>> well >>>>> >>>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // Doesn't >>>>> work >>>>> >>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>> KafkaSink("hello1")).start().awaitTermination() >>>>> >>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>> KafkaSink("hello2")).start().awaitTermination() >>>>> >>>>> >>>>> *So what's happening with above code is that I can see data coming out >>>>> of hello1 topic but not from hello2 topic.* I thought there is >>>>> something wrong with "outputDS2" so I switched the order so now the code >>>>> looks like this >>>>> >>>>> DataSet<String> inputDS = readFromKaka(topicName) >>>>> >>>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works >>>>> Since I can see data getting populated >>>>> >>>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // This >>>>> Works >>>>> >>>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Desn't >>>>> work >>>>> >>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>> KafkaSink("hello1")).start().awaitTermination() >>>>> >>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>> KafkaSink("hello2")).start().awaitTermination() >>>>> >>>>> *Now I can see data coming out from hello2 kafka topic but not from >>>>> hello1 topic*. *In short, I can only see data from outputDS1 or >>>>> outputDS2 but not both. * At this point I am not sure what is going >>>>> on? >>>>> >>>>> Thanks! >>>>> >>>>> >>>>> >>>> >>> >> >