Please remove

query1.awaitTermination();
query2.awaitTermination();

once

query1.awaitTermination();

is called, you don't even get to query2.awaitTermination().


On Tue, Sep 19, 2017 at 11:59 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi Burak,
>
> Thanks much! had no clue that existed. Now, I changed it to this.
>
> StreamingQuery query1 = 
> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new 
> KafkaSink("hello1")).start();
> StreamingQuery query2 = 
> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new 
> KafkaSink("hello2")).start();
>
> query1.awaitTermination();
> query2.awaitTermination();
> sparkSession.streams().awaitAnyTermination();
>
>
>
>
>
> On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz <brk...@gmail.com> wrote:
>
>> Hey Kant,
>>
>> That won't work either. Your second query may fail, and as long as your
>> first query is running, you will not know. Put this as the last line
>> instead:
>>
>> spark.streams.awaitAnyTermination()
>>
>> On Tue, Sep 19, 2017 at 10:11 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Looks like my problem was the order of awaitTermination() for some
>>> reason.
>>>
>>> *Doesn't work *
>>>
>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello1")).start().awaitTermination()
>>>
>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello2")).start().awaitTermination()
>>>
>>> *Works*
>>>
>>> StreamingQuery query1 = outputDS1.writeStream().trigge
>>> r(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello1")).start();
>>>
>>> query1.awaitTermination()
>>>
>>> StreamingQuery query2 =outputDS2.writeStream().trigg
>>> er(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello2")).start();
>>>
>>> query2.awaitTermination()
>>>
>>>
>>>
>>> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali <kanth...@gmail.com>
>>> wrote:
>>>
>>>> Looks like my problem was the order of awaitTermination() for some
>>>> reason.
>>>>
>>>> Doesn't work
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have the following Psuedo code (I could paste the real code however
>>>>> it is pretty long and involves Database calls inside dataset.map operation
>>>>> and so on) so I am just trying to simplify my question. would like to know
>>>>> if there is something wrong with the following pseudo code?
>>>>>
>>>>> DataSet<String> inputDS = readFromKaka(topicName)
>>>>>
>>>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works
>>>>> Since I can see data getting populated
>>>>>
>>>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Works as
>>>>> well
>>>>>
>>>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // Doesn't
>>>>> work
>>>>>
>>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>> KafkaSink("hello1")).start().awaitTermination()
>>>>>
>>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>> KafkaSink("hello2")).start().awaitTermination()
>>>>>
>>>>>
>>>>> *So what's happening with above code is that I can see data coming out
>>>>> of hello1 topic but not from hello2 topic.* I thought there is
>>>>> something wrong with "outputDS2" so I switched the order  so now the code
>>>>> looks like this
>>>>>
>>>>> DataSet<String> inputDS = readFromKaka(topicName)
>>>>>
>>>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works
>>>>> Since I can see data getting populated
>>>>>
>>>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // This
>>>>> Works
>>>>>
>>>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Desn't
>>>>> work
>>>>>
>>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>> KafkaSink("hello1")).start().awaitTermination()
>>>>>
>>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>> KafkaSink("hello2")).start().awaitTermination()
>>>>>
>>>>> *Now I can see data coming out from hello2 kafka topic but not from
>>>>> hello1 topic*. *In  short, I can only see data from outputDS1 or
>>>>> outputDS2 but not both. * At this point I am not sure what is going
>>>>> on?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to