Thanks T.D! And sorry for the typo. It's very helpful to know that whatever I was achieving with DStreams I can also achieve the same with Structured streaming.
It seems like there is some other error in my code which I fixed it and it seem to be working fine now! Thanks again! On Thu, Sep 14, 2017 at 12:23 AM, Tathagata Das <tathagata.das1...@gmail.com > wrote: > Are you sure the code is correct? A Dataset does not have a method > "trigger". Rather I believe the correct code should be > > StreamingQuery query = resultDataSet*.writeStream.*trigger( > ProcesingTime(1000)).format("kafka").start(); > > You can do all the same things you can do with Structured Streaming as > DStreams. For example, there is foreach in Structured Streaming. E.g. > resultDataSet.writeStream.foreach(...) > > When you say mapPartitions code is not getting executed. ... are you sure > the query is running? Maybe actual code (not pseudocode) may help debug > this. > > > On Wed, Sep 13, 2017 at 11:20 AM, kant kodali <kanth...@gmail.com> wrote: > >> Hi All, >> >> I am trying to read data from kafka, insert into Mongo and read from >> mongo and insert back into Kafka. I went with structured stream approach >> first however I believe I am making some naiver error because my map >> operations are not getting invoked. >> >> The pseudo code looks like this >> >> DataSet<String> resultDataSet = jsonDataset.mapPartitions( >> insertIntoMongo).mapPartitions(readFromMongo); >> >> StreamingQuery query = resultDataSet.trigger(Procesin >> gTime(1000)).format("kafka").start(); >> >> query.awaitTermination(); >> >> The mapPartitions in this code is not getting executed. Is this because I >> am not calling any action on my streaming dataset? In the Dstream case, I >> used to call forEachRDD and it worked well. so how do I do this using >> structured streaming? >> >> Thanks! >> > >