Just to be crystal clear Dstreams will be deprecated sooner or later and
there will be no support so highly advised to migrate...

G


On Sun, 4 Apr 2021, 19:23 Ali Gouta, <ali.go...@gmail.com> wrote:

> Thanks Mich !
>
> Ali Gouta.
>
> On Sun, Apr 4, 2021 at 6:44 PM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Hi Ali,
>>
>> The old saying of one experiment is worth a hundred hypotheses, still
>> stands.
>>
>> As per Test driven approach have a go at it and see what comes out. Forum
>> members including myself have reported on SSS in Spark user group, so you
>> are at home on this.
>>
>> HTH,
>>
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 4 Apr 2021 at 17:28, Ali Gouta <ali.go...@gmail.com> wrote:
>>
>>> Great, so SSS provides also an api that allows handling RDDs through
>>> dataFrames using foreachBatch. Still that I am not sure this is a
>>> good practice in general right ? Well, it depends on the use case in any
>>> way.
>>>
>>> Thank you so much for the hints !
>>>
>>> Best regards,
>>> Ali Gouta.
>>>
>>> On Sun, Apr 4, 2021 at 6:11 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Hi Ali,
>>>>
>>>>
>>>> On a practical side, I have used both the old DStreams and the newer
>>>> Spark structured streaming (SSS).
>>>>
>>>>
>>>> SSS does a good job at micro-batch level in the form of
>>>>
>>>>
>>>> foreachBatch(SendToSink)
>>>>
>>>>
>>>>  "foreach" performs custom write logic on each row and "foreachBatch" 
>>>> *performs
>>>> custom write logic *on each micro-batch through SendToSink function.
>>>> foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as
>>>> DataFrame or Dataset and second: unique id for each batch. Using
>>>> foreachBatch, we write each micro batch eventually to storage defined in
>>>> our custom logic. In this case, we store the output of our streaming
>>>> application to Redis or Google BigQuery table or any other sink
>>>>
>>>>
>>>>
>>>> In Dstream world you would have done something like below
>>>>
>>>>
>>>>     // Work on every Stream
>>>>
>>>>     dstream.foreachRDD
>>>>
>>>>     { pricesRDD =>
>>>>
>>>>       if (!pricesRDD.isEmpty)  // data exists in RDD
>>>>
>>>>       {
>>>>
>>>> and after some work from that RDD you would have created a DF (df)
>>>>
>>>> With regard to SSS, it allows you to use the passed DataFrame for your
>>>> work. However, say in my case if you were interested in individual rows of
>>>> micro-batch (say different collection of prices for different tickers
>>>> (securities), you could create RDD from the dataframe
>>>>
>>>> for row in df.rdd.collect():
>>>>     ticker = row.ticker
>>>>     price = row.price
>>>>
>>>>
>>>> With regard to foreach(process_row), I have not really tried it as we
>>>> don't have a use case for it, so I assume your mileage varies as usual.
>>>>
>>>>
>>>> HTH
>>>>
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Sun, 4 Apr 2021 at 16:27, Ali Gouta <ali.go...@gmail.com> wrote:
>>>>
>>>>> Thank you guys for your answers, I will dig more this new way of doing
>>>>> things and why not consider leaving the old Dstreams and use instead
>>>>> structured streaming. Hope that strucrured streaming + spark on Kubernetes
>>>>> works well and the combination is production ready.
>>>>>
>>>>> Best regards,
>>>>> Ali Gouta.
>>>>>
>>>>> Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski <ja...@japila.pl> a
>>>>> écrit :
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Just to add it to Gabor's excellent answer that checkpointing and
>>>>>> offsets are infrastructure-related and should not really be in the hands 
>>>>>> of
>>>>>> Spark devs who should instead focus on the business purpose of the code
>>>>>> (not offsets that are very low-level and not really important).
>>>>>>
>>>>>> BTW That's what happens in Kafka Streams too
>>>>>>
>>>>>> Pozdrawiam,
>>>>>> Jacek Laskowski
>>>>>> ----
>>>>>> https://about.me/JacekLaskowski
>>>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>>>
>>>>>> <https://twitter.com/jaceklaskowski>
>>>>>>
>>>>>>
>>>>>> On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi <
>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>
>>>>>>> There is no way to store offsets in Kafka and restart from the
>>>>>>> stored offset. Structured Streaming stores offset in checkpoint and it
>>>>>>> restart from there without any user code.
>>>>>>>
>>>>>>> Offsets can be stored with a listener but it can be only used for
>>>>>>> lag calculation.
>>>>>>>
>>>>>>> BR,
>>>>>>> G
>>>>>>>
>>>>>>>
>>>>>>> On Sat, 3 Apr 2021, 21:09 Ali Gouta, <ali.go...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I was reading the spark docs about spark structured streaming,
>>>>>>>> since we are thinking about updating our code base that today uses
>>>>>>>> Dstreams, hence spark streaming. Also, one main reason for this change 
>>>>>>>> that
>>>>>>>> we want to realize is that reading headers in kafka messages is only
>>>>>>>> supported in spark structured streaming and not in Dstreams.
>>>>>>>>
>>>>>>>> I was surprised to not see an obvious way to handle manually the
>>>>>>>> offsets by committing the offsets to kafka. In spark streaming we used 
>>>>>>>> to
>>>>>>>> do it with something similar to these lines of code:
>>>>>>>>
>>>>>>>> stream.foreachRDD { rdd =>
>>>>>>>>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>>>>>>
>>>>>>>>   // some time later, after outputs have completed
>>>>>>>>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
>>>>>>>>
>>>>>>>>
>>>>>>>> And this works perfectly ! Especially, this works very nice in case
>>>>>>>> of job failure/restart... I am wondering how this can be achieved in 
>>>>>>>> spark
>>>>>>>> structured streaming ?
>>>>>>>>
>>>>>>>> I read about checkpoints, and this reminds me the old way of doing
>>>>>>>> things in spark 1.5/kafka0.8 and is not perfect since we are not 
>>>>>>>> deciding
>>>>>>>> when to commit offsets by ourselves.
>>>>>>>>
>>>>>>>> Did I miss anything ? What would be the best way of committing
>>>>>>>> offsets to kafka with spark structured streaming to the concerned 
>>>>>>>> consumer
>>>>>>>> group ?
>>>>>>>>
>>>>>>>> Best regards,
>>>>>>>> Ali Gouta.
>>>>>>>>
>>>>>>>

Reply via email to