Hey Kant,

That won't work either. Your second query may fail, and as long as your
first query is running, you will not know. Put this as the last line
instead:

spark.streams.awaitAnyTermination()

On Tue, Sep 19, 2017 at 10:11 PM, kant kodali <kanth...@gmail.com> wrote:

> Looks like my problem was the order of awaitTermination() for some reason.
>
> *Doesn't work *
>
> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello1")).start().awaitTermination()
>
> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello2")).start().awaitTermination()
>
> *Works*
>
> StreamingQuery query1 = outputDS1.writeStream().trigge
> r(Trigger.processingTime(1000)).foreach(new KafkaSink("hello1")).start();
>
> query1.awaitTermination()
>
> StreamingQuery query2 =outputDS2.writeStream().trigg
> er(Trigger.processingTime(1000)).foreach(new KafkaSink("hello2")).start();
>
> query2.awaitTermination()
>
>
>
> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> Looks like my problem was the order of awaitTermination() for some reason.
>>
>> Doesn't work
>>
>>
>>
>>
>>
>> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I have the following Psuedo code (I could paste the real code however it
>>> is pretty long and involves Database calls inside dataset.map operation and
>>> so on) so I am just trying to simplify my question. would like to know if
>>> there is something wrong with the following pseudo code?
>>>
>>> DataSet<String> inputDS = readFromKaka(topicName)
>>>
>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works
>>> Since I can see data getting populated
>>>
>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Works as
>>> well
>>>
>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // Doesn't
>>> work
>>>
>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello1")).start().awaitTermination()
>>>
>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello2")).start().awaitTermination()
>>>
>>>
>>> *So what's happening with above code is that I can see data coming out
>>> of hello1 topic but not from hello2 topic.* I thought there is
>>> something wrong with "outputDS2" so I switched the order  so now the code
>>> looks like this
>>>
>>> DataSet<String> inputDS = readFromKaka(topicName)
>>>
>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works
>>> Since I can see data getting populated
>>>
>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // This Works
>>>
>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Desn't work
>>>
>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello1")).start().awaitTermination()
>>>
>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello2")).start().awaitTermination()
>>>
>>> *Now I can see data coming out from hello2 kafka topic but not from
>>> hello1 topic*. *In  short, I can only see data from outputDS1 or
>>> outputDS2 but not both. * At this point I am not sure what is going on?
>>>
>>> Thanks!
>>>
>>>
>>>
>>
>

Reply via email to