Hello @Tathagata Das <tathagata.das1...@gmail.com>
Could you share your thoughts on
https://issues.apache.org/jira/browse/SPARK-48418 ? Let me know if you have
any questions. thanks.

Regards,
Anil

On Fri, May 24, 2024 at 12:13 AM Anil Dasari <adas...@guidewire.com> wrote:

> It appears that structured streaming and Dstream have entirely different
> microbatch metadata representation
> Can someone assist me in finding the following Dstream microbatch metadata
> equivalent in Structured streaming.
>
> 1. microbatch timestamp : structured streaming foreachBatch gives batchID
> which is not a timestamp. Is there a way to get the microbatch timestamp ?
> 2. microbatch start event ?
> 3. scheduling delay of a microbatch ?
> 4. pending microbatches in case of fixed internal microbatches ?
>
> Thanks
>
> On Wed, May 22, 2024 at 5:23 PM Anil Dasari <adas...@guidewire.com> wrote:
>
>> You are right.
>> - another question on migration. Is there a way to get the microbatch id
>> during the microbatch dataset `trasform` operation like in rdd transform ?
>> I am attempting to implement the following pseudo functionality with
>> structured streaming. In this approach, recordCategoriesMetadata is fetched
>> and rdd metrics like rdd size etc using microbatch idin the transform
>> operation.
>> ```code
>> val rddQueue = new mutable.Queue[RDD[Int]]()
>> // source components
>> val sources = Seq.empty[String]
>> val consolidatedDstream = sources
>> .map(source => {
>> val inputStream = ssc.queueStream(rddQueue)
>> inputStream.transform((rdd, ts) => {
>> // emit metrics of microbatch ts : rdd size etc.
>>
>> val recordCategories = rdd.map(..).collect();
>> val recordCategoriesMetadata = ...
>> rdd
>> .map(r =>
>> val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
>> (source, customRecord)
>> )
>> })
>> }
>> )
>> .reduceLeft(_ union _)
>>
>> consolidatedDstream
>> .foreachRDD((rdd, ts) => {
>> // get pipes for each source
>> val pipes = Seq.empty[String] // pipes of given source
>> pipes.foreach(pipe => {
>> val pipeSource = null; // get from pipe variable
>> val psRDD = rdd
>> .filter {
>> case (source, sourceRDD) => source.equals(pipeSource)
>> }
>> // apply pipe transformation and sink
>>
>> })
>> })
>> ```
>>
>> In structured streaming, it can look like -
>>
>> ```code
>> val consolidatedDstream = sources
>> .map(source => {
>> val inputStream = ... (for each source)
>> inputStream
>> }
>> )
>> .reduceLeft(_ union _)
>>
>> consolidatedDstream
>> .writeStream
>> .foreachBatch((ds, ts) => {
>> val newDS = ds.transform((internalDS => {
>> // emit metrics of microbatch ts : rdd size etc.
>>
>> val recordCategories = rdd.map(..).collect();
>> val recordCategoriesMetadata = ...
>> internalDS
>> .map(r =>
>> val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
>> (source, customRecord)
>> )
>> })(... <encoder>)
>> // get pipes for each source
>> val pipes = Seq.empty[String] // pipes of given source
>> pipes.foreach(pipe => {
>> val pipeSource = null; // get from pipe variable
>> val psRDD = newDS
>> .filter {
>> case (source, sourceDS) => source.equals(pipeSource)
>> }
>> // apply pipe transformation and sink
>>
>> })
>> })
>> ```
>> ^ is just pseudo code and still not sure if it works. Let me know your
>> suggestions if any. thanks.
>>
>> On Wed, May 22, 2024 at 8:34 AM Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> The right way to associated microbatches when committing to external
>>> storage is to use the microbatch id that you can get in foreachBatch. That
>>> microbatch id guarantees that the data produced in the batch is the always
>>> the same no matter any recomputations (assuming all processing logic is
>>> deterministic). So you can commit the batch id + batch data together. And
>>> then async commit the batch id + offsets.
>>>
>>> On Wed, May 22, 2024 at 11:27 AM Anil Dasari <adas...@guidewire.com>
>>> wrote:
>>>
>>>> Thanks Das, Mtich.
>>>>
>>>> Mitch,
>>>> We process data from Kafka and write it to S3 in Parquet format using
>>>> Dstreams. To ensure exactly-once delivery and prevent data loss, our
>>>> process records micro-batch offsets to an external storage at the end of
>>>> each micro-batch in foreachRDD, which is then used when the job restarts.
>>>>
>>>> Das,
>>>> Thanks for sharing the details. I will look into them.
>>>> Unfortunately, the listeners process is async and can't
>>>> guarantee happens before association with microbatch to commit offsets to
>>>> external storage. But still they will work. Is there a way to access
>>>> lastProgress in foreachBatch ?
>>>>
>>>>
>>>> On Wed, May 22, 2024 at 7:35 AM Tathagata Das <
>>>> tathagata.das1...@gmail.com> wrote:
>>>>
>>>>> If you want to find what offset ranges are present in a microbatch in
>>>>> Structured Streaming, you have to look at the
>>>>> StreamingQuery.lastProgress or use the QueryProgressListener
>>>>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html>.
>>>>> Both of these approaches gives you access to the SourceProgress
>>>>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/SourceProgress.html>
>>>>> which gives Kafka offsets as a JSON string.
>>>>>
>>>>> Hope this helps!
>>>>>
>>>>> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>> OK to understand better your current model relies on streaming data
>>>>>> input through Kafka topic, Spark does some ETL and you send to a sink, a
>>>>>> database for file storage like HDFS etc?
>>>>>>
>>>>>> Your current architecture relies on Direct Streams (DStream) and RDDs
>>>>>> and you want to move to Spark sStructured Streaming based on dataframes 
>>>>>> and
>>>>>> datasets?
>>>>>>
>>>>>> You have not specified your sink
>>>>>>
>>>>>> With regard to your question?
>>>>>>
>>>>>> "Is there an equivalent of Dstream HasOffsetRanges in structure
>>>>>> streaming to get the microbatch end offsets to the checkpoint in our
>>>>>> external checkpoint store ?"
>>>>>>
>>>>>> There is not a direct equivalent of DStream HasOffsetRanges in Spark
>>>>>> Structured Streaming. However, Structured Streaming provides mechanisms 
>>>>>> to
>>>>>> achieve similar functionality:
>>>>>>
>>>>>> HTH
>>>>>>
>>>>>> Mich Talebzadeh,
>>>>>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>>>>>> London
>>>>>> United Kingdom
>>>>>>
>>>>>>
>>>>>>    view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>> <https://en.everybodywiki.com/Mich_Talebzadeh>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* The information provided is correct to the best of my
>>>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>>>> expert opinions (Werner
>>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
>>>>>> Braun
>>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>
>>>>>> )".
>>>>>>
>>>>>>
>>>>>> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID
>>>>>> <ashok34...@yahoo.com.invalid> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> what options are you considering yourself?
>>>>>>>
>>>>>>> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari <
>>>>>>> adas...@guidewire.com> wrote:
>>>>>>>
>>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> We are on Spark 3.x and using Spark dstream + kafka and planning to
>>>>>>> use structured streaming + Kafka.
>>>>>>> Is there an equivalent of Dstream HasOffsetRanges in structure
>>>>>>> streaming to get the microbatch end offsets to the checkpoint in our
>>>>>>> external checkpoint store ? Thanks in advance.
>>>>>>>
>>>>>>> Regards
>>>>>>>
>>>>>>>

Reply via email to