Just to be crystal clear Dstreams will be deprecated sooner or later and there will be no support so highly advised to migrate...
G On Sun, 4 Apr 2021, 19:23 Ali Gouta, <ali.go...@gmail.com> wrote: > Thanks Mich ! > > Ali Gouta. > > On Sun, Apr 4, 2021 at 6:44 PM Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> Hi Ali, >> >> The old saying of one experiment is worth a hundred hypotheses, still >> stands. >> >> As per Test driven approach have a go at it and see what comes out. Forum >> members including myself have reported on SSS in Spark user group, so you >> are at home on this. >> >> HTH, >> >> >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Sun, 4 Apr 2021 at 17:28, Ali Gouta <ali.go...@gmail.com> wrote: >> >>> Great, so SSS provides also an api that allows handling RDDs through >>> dataFrames using foreachBatch. Still that I am not sure this is a >>> good practice in general right ? Well, it depends on the use case in any >>> way. >>> >>> Thank you so much for the hints ! >>> >>> Best regards, >>> Ali Gouta. >>> >>> On Sun, Apr 4, 2021 at 6:11 PM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> Hi Ali, >>>> >>>> >>>> On a practical side, I have used both the old DStreams and the newer >>>> Spark structured streaming (SSS). >>>> >>>> >>>> SSS does a good job at micro-batch level in the form of >>>> >>>> >>>> foreachBatch(SendToSink) >>>> >>>> >>>> "foreach" performs custom write logic on each row and "foreachBatch" >>>> *performs >>>> custom write logic *on each micro-batch through SendToSink function. >>>> foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as >>>> DataFrame or Dataset and second: unique id for each batch. Using >>>> foreachBatch, we write each micro batch eventually to storage defined in >>>> our custom logic. In this case, we store the output of our streaming >>>> application to Redis or Google BigQuery table or any other sink >>>> >>>> >>>> >>>> In Dstream world you would have done something like below >>>> >>>> >>>> // Work on every Stream >>>> >>>> dstream.foreachRDD >>>> >>>> { pricesRDD => >>>> >>>> if (!pricesRDD.isEmpty) // data exists in RDD >>>> >>>> { >>>> >>>> and after some work from that RDD you would have created a DF (df) >>>> >>>> With regard to SSS, it allows you to use the passed DataFrame for your >>>> work. However, say in my case if you were interested in individual rows of >>>> micro-batch (say different collection of prices for different tickers >>>> (securities), you could create RDD from the dataframe >>>> >>>> for row in df.rdd.collect(): >>>> ticker = row.ticker >>>> price = row.price >>>> >>>> >>>> With regard to foreach(process_row), I have not really tried it as we >>>> don't have a use case for it, so I assume your mileage varies as usual. >>>> >>>> >>>> HTH >>>> >>>> >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Sun, 4 Apr 2021 at 16:27, Ali Gouta <ali.go...@gmail.com> wrote: >>>> >>>>> Thank you guys for your answers, I will dig more this new way of doing >>>>> things and why not consider leaving the old Dstreams and use instead >>>>> structured streaming. Hope that strucrured streaming + spark on Kubernetes >>>>> works well and the combination is production ready. >>>>> >>>>> Best regards, >>>>> Ali Gouta. >>>>> >>>>> Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski <ja...@japila.pl> a >>>>> écrit : >>>>> >>>>>> Hi, >>>>>> >>>>>> Just to add it to Gabor's excellent answer that checkpointing and >>>>>> offsets are infrastructure-related and should not really be in the hands >>>>>> of >>>>>> Spark devs who should instead focus on the business purpose of the code >>>>>> (not offsets that are very low-level and not really important). >>>>>> >>>>>> BTW That's what happens in Kafka Streams too >>>>>> >>>>>> Pozdrawiam, >>>>>> Jacek Laskowski >>>>>> ---- >>>>>> https://about.me/JacekLaskowski >>>>>> "The Internals Of" Online Books <https://books.japila.pl/> >>>>>> Follow me on https://twitter.com/jaceklaskowski >>>>>> >>>>>> <https://twitter.com/jaceklaskowski> >>>>>> >>>>>> >>>>>> On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi < >>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>> >>>>>>> There is no way to store offsets in Kafka and restart from the >>>>>>> stored offset. Structured Streaming stores offset in checkpoint and it >>>>>>> restart from there without any user code. >>>>>>> >>>>>>> Offsets can be stored with a listener but it can be only used for >>>>>>> lag calculation. >>>>>>> >>>>>>> BR, >>>>>>> G >>>>>>> >>>>>>> >>>>>>> On Sat, 3 Apr 2021, 21:09 Ali Gouta, <ali.go...@gmail.com> wrote: >>>>>>> >>>>>>>> Hello, >>>>>>>> >>>>>>>> I was reading the spark docs about spark structured streaming, >>>>>>>> since we are thinking about updating our code base that today uses >>>>>>>> Dstreams, hence spark streaming. Also, one main reason for this change >>>>>>>> that >>>>>>>> we want to realize is that reading headers in kafka messages is only >>>>>>>> supported in spark structured streaming and not in Dstreams. >>>>>>>> >>>>>>>> I was surprised to not see an obvious way to handle manually the >>>>>>>> offsets by committing the offsets to kafka. In spark streaming we used >>>>>>>> to >>>>>>>> do it with something similar to these lines of code: >>>>>>>> >>>>>>>> stream.foreachRDD { rdd => >>>>>>>> val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >>>>>>>> >>>>>>>> // some time later, after outputs have completed >>>>>>>> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)} >>>>>>>> >>>>>>>> >>>>>>>> And this works perfectly ! Especially, this works very nice in case >>>>>>>> of job failure/restart... I am wondering how this can be achieved in >>>>>>>> spark >>>>>>>> structured streaming ? >>>>>>>> >>>>>>>> I read about checkpoints, and this reminds me the old way of doing >>>>>>>> things in spark 1.5/kafka0.8 and is not perfect since we are not >>>>>>>> deciding >>>>>>>> when to commit offsets by ourselves. >>>>>>>> >>>>>>>> Did I miss anything ? What would be the best way of committing >>>>>>>> offsets to kafka with spark structured streaming to the concerned >>>>>>>> consumer >>>>>>>> group ? >>>>>>>> >>>>>>>> Best regards, >>>>>>>> Ali Gouta. >>>>>>>> >>>>>>>