Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-06-20 Thread Anil Dasari
Hello @Tathagata Das 
Could you share your thoughts on
https://issues.apache.org/jira/browse/SPARK-48418 ? Let me know if you have
any questions. thanks.

Regards,
Anil

On Fri, May 24, 2024 at 12:13 AM Anil Dasari  wrote:

> It appears that structured streaming and Dstream have entirely different
> microbatch metadata representation
> Can someone assist me in finding the following Dstream microbatch metadata
> equivalent in Structured streaming.
>
> 1. microbatch timestamp : structured streaming foreachBatch gives batchID
> which is not a timestamp. Is there a way to get the microbatch timestamp ?
> 2. microbatch start event ?
> 3. scheduling delay of a microbatch ?
> 4. pending microbatches in case of fixed internal microbatches ?
>
> Thanks
>
> On Wed, May 22, 2024 at 5:23 PM Anil Dasari  wrote:
>
>> You are right.
>> - another question on migration. Is there a way to get the microbatch id
>> during the microbatch dataset `trasform` operation like in rdd transform ?
>> I am attempting to implement the following pseudo functionality with
>> structured streaming. In this approach, recordCategoriesMetadata is fetched
>> and rdd metrics like rdd size etc using microbatch idin the transform
>> operation.
>> ```code
>> val rddQueue = new mutable.Queue[RDD[Int]]()
>> // source components
>> val sources = Seq.empty[String]
>> val consolidatedDstream = sources
>> .map(source => {
>> val inputStream = ssc.queueStream(rddQueue)
>> inputStream.transform((rdd, ts) => {
>> // emit metrics of microbatch ts : rdd size etc.
>>
>> val recordCategories = rdd.map(..).collect();
>> val recordCategoriesMetadata = ...
>> rdd
>> .map(r =>
>> val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
>> (source, customRecord)
>> )
>> })
>> }
>> )
>> .reduceLeft(_ union _)
>>
>> consolidatedDstream
>> .foreachRDD((rdd, ts) => {
>> // get pipes for each source
>> val pipes = Seq.empty[String] // pipes of given source
>> pipes.foreach(pipe => {
>> val pipeSource = null; // get from pipe variable
>> val psRDD = rdd
>> .filter {
>> case (source, sourceRDD) => source.equals(pipeSource)
>> }
>> // apply pipe transformation and sink
>>
>> })
>> })
>> ```
>>
>> In structured streaming, it can look like -
>>
>> ```code
>> val consolidatedDstream = sources
>> .map(source => {
>> val inputStream = ... (for each source)
>> inputStream
>> }
>> )
>> .reduceLeft(_ union _)
>>
>> consolidatedDstream
>> .writeStream
>> .foreachBatch((ds, ts) => {
>> val newDS = ds.transform((internalDS => {
>> // emit metrics of microbatch ts : rdd size etc.
>>
>> val recordCategories = rdd.map(..).collect();
>> val recordCategoriesMetadata = ...
>> internalDS
>> .map(r =>
>> val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
>> (source, customRecord)
>> )
>> })(... )
>> // get pipes for each source
>> val pipes = Seq.empty[String] // pipes of given source
>> pipes.foreach(pipe => {
>> val pipeSource = null; // get from pipe variable
>> val psRDD = newDS
>> .filter {
>> case (source, sourceDS) => source.equals(pipeSource)
>> }
>> // apply pipe transformation and sink
>>
>> })
>> })
>> ```
>> ^ is just pseudo code and still not sure if it works. Let me know your
>> suggestions if any. thanks.
>>
>> On Wed, May 22, 2024 at 8:34 AM Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> The right way to associated microbatches when committing to external
>>> storage is to use the microbatch id that you can get in foreachBatch. That
>>> microbatch id guarantees that the data produced in the batch is the always
>>> the same no matter any recomputations (assuming all processing logic is
>>> deterministic). So you can commit the batch id + batch data together. And
>>> then async commit the batch id + offsets.
>>>
>>> On Wed, May 22, 2024 at 11:27 AM Anil Dasari 
>>> wrote:
>>>
 Thanks Das, Mtich.

 Mitch,
 We process data from Kafka and write it to S3 in Parquet format using
 Dstreams. To ensure exactly-once delivery and prevent data loss, our
 process records micro-batch offsets to an external storage at the end of
 each micro-batch in foreachRDD, which is then used when the job restarts.

 Das,
 Thanks for sharing the details. I will look into them.
 Unfortunately, the listeners process is async and can't
 guarantee happens before association with microbatch to commit offsets to
 external storage. But still they will work. Is there a way to access
 lastProgress in foreachBatch ?


 On Wed, May 22, 2024 at 7:35 AM Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> If you want to find what offset ranges are present in a microbatch in
> Structured Streaming, you have to look at the
> StreamingQuery.lastProgress or use the QueryProgressListener
> .
> Both of these approaches gives you access to the SourceProgress
> 

Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-24 Thread Anil Dasari
It appears that structured streaming and Dstream have entirely different
microbatch metadata representation
Can someone assist me in finding the following Dstream microbatch metadata
equivalent in Structured streaming.

1. microbatch timestamp : structured streaming foreachBatch gives batchID
which is not a timestamp. Is there a way to get the microbatch timestamp ?
2. microbatch start event ?
3. scheduling delay of a microbatch ?
4. pending microbatches in case of fixed internal microbatches ?

Thanks

On Wed, May 22, 2024 at 5:23 PM Anil Dasari  wrote:

> You are right.
> - another question on migration. Is there a way to get the microbatch id
> during the microbatch dataset `trasform` operation like in rdd transform ?
> I am attempting to implement the following pseudo functionality with
> structured streaming. In this approach, recordCategoriesMetadata is fetched
> and rdd metrics like rdd size etc using microbatch idin the transform
> operation.
> ```code
> val rddQueue = new mutable.Queue[RDD[Int]]()
> // source components
> val sources = Seq.empty[String]
> val consolidatedDstream = sources
> .map(source => {
> val inputStream = ssc.queueStream(rddQueue)
> inputStream.transform((rdd, ts) => {
> // emit metrics of microbatch ts : rdd size etc.
>
> val recordCategories = rdd.map(..).collect();
> val recordCategoriesMetadata = ...
> rdd
> .map(r =>
> val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
> (source, customRecord)
> )
> })
> }
> )
> .reduceLeft(_ union _)
>
> consolidatedDstream
> .foreachRDD((rdd, ts) => {
> // get pipes for each source
> val pipes = Seq.empty[String] // pipes of given source
> pipes.foreach(pipe => {
> val pipeSource = null; // get from pipe variable
> val psRDD = rdd
> .filter {
> case (source, sourceRDD) => source.equals(pipeSource)
> }
> // apply pipe transformation and sink
>
> })
> })
> ```
>
> In structured streaming, it can look like -
>
> ```code
> val consolidatedDstream = sources
> .map(source => {
> val inputStream = ... (for each source)
> inputStream
> }
> )
> .reduceLeft(_ union _)
>
> consolidatedDstream
> .writeStream
> .foreachBatch((ds, ts) => {
> val newDS = ds.transform((internalDS => {
> // emit metrics of microbatch ts : rdd size etc.
>
> val recordCategories = rdd.map(..).collect();
> val recordCategoriesMetadata = ...
> internalDS
> .map(r =>
> val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
> (source, customRecord)
> )
> })(... )
> // get pipes for each source
> val pipes = Seq.empty[String] // pipes of given source
> pipes.foreach(pipe => {
> val pipeSource = null; // get from pipe variable
> val psRDD = newDS
> .filter {
> case (source, sourceDS) => source.equals(pipeSource)
> }
> // apply pipe transformation and sink
>
> })
> })
> ```
> ^ is just pseudo code and still not sure if it works. Let me know your
> suggestions if any. thanks.
>
> On Wed, May 22, 2024 at 8:34 AM Tathagata Das 
> wrote:
>
>> The right way to associated microbatches when committing to external
>> storage is to use the microbatch id that you can get in foreachBatch. That
>> microbatch id guarantees that the data produced in the batch is the always
>> the same no matter any recomputations (assuming all processing logic is
>> deterministic). So you can commit the batch id + batch data together. And
>> then async commit the batch id + offsets.
>>
>> On Wed, May 22, 2024 at 11:27 AM Anil Dasari 
>> wrote:
>>
>>> Thanks Das, Mtich.
>>>
>>> Mitch,
>>> We process data from Kafka and write it to S3 in Parquet format using
>>> Dstreams. To ensure exactly-once delivery and prevent data loss, our
>>> process records micro-batch offsets to an external storage at the end of
>>> each micro-batch in foreachRDD, which is then used when the job restarts.
>>>
>>> Das,
>>> Thanks for sharing the details. I will look into them.
>>> Unfortunately, the listeners process is async and can't
>>> guarantee happens before association with microbatch to commit offsets to
>>> external storage. But still they will work. Is there a way to access
>>> lastProgress in foreachBatch ?
>>>
>>>
>>> On Wed, May 22, 2024 at 7:35 AM Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 If you want to find what offset ranges are present in a microbatch in
 Structured Streaming, you have to look at the
 StreamingQuery.lastProgress or use the QueryProgressListener
 .
 Both of these approaches gives you access to the SourceProgress
 
 which gives Kafka offsets as a JSON string.

 Hope this helps!

 On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> OK to understand better your current model relies on streaming data
> input through Kafka topic, Spark does some ETL and you send to a sink, 

Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Anil Dasari
You are right.
- another question on migration. Is there a way to get the microbatch id
during the microbatch dataset `trasform` operation like in rdd transform ?
I am attempting to implement the following pseudo functionality with
structured streaming. In this approach, recordCategoriesMetadata is fetched
and rdd metrics like rdd size etc using microbatch idin the transform
operation.
```code
val rddQueue = new mutable.Queue[RDD[Int]]()
// source components
val sources = Seq.empty[String]
val consolidatedDstream = sources
.map(source => {
val inputStream = ssc.queueStream(rddQueue)
inputStream.transform((rdd, ts) => {
// emit metrics of microbatch ts : rdd size etc.

val recordCategories = rdd.map(..).collect();
val recordCategoriesMetadata = ...
rdd
.map(r =>
val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
(source, customRecord)
)
})
}
)
.reduceLeft(_ union _)

consolidatedDstream
.foreachRDD((rdd, ts) => {
// get pipes for each source
val pipes = Seq.empty[String] // pipes of given source
pipes.foreach(pipe => {
val pipeSource = null; // get from pipe variable
val psRDD = rdd
.filter {
case (source, sourceRDD) => source.equals(pipeSource)
}
// apply pipe transformation and sink

})
})
```

In structured streaming, it can look like -

```code
val consolidatedDstream = sources
.map(source => {
val inputStream = ... (for each source)
inputStream
}
)
.reduceLeft(_ union _)

consolidatedDstream
.writeStream
.foreachBatch((ds, ts) => {
val newDS = ds.transform((internalDS => {
// emit metrics of microbatch ts : rdd size etc.

val recordCategories = rdd.map(..).collect();
val recordCategoriesMetadata = ...
internalDS
.map(r =>
val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
(source, customRecord)
)
})(... )
// get pipes for each source
val pipes = Seq.empty[String] // pipes of given source
pipes.foreach(pipe => {
val pipeSource = null; // get from pipe variable
val psRDD = newDS
.filter {
case (source, sourceDS) => source.equals(pipeSource)
}
// apply pipe transformation and sink

})
})
```
^ is just pseudo code and still not sure if it works. Let me know your
suggestions if any. thanks.

On Wed, May 22, 2024 at 8:34 AM Tathagata Das 
wrote:

> The right way to associated microbatches when committing to external
> storage is to use the microbatch id that you can get in foreachBatch. That
> microbatch id guarantees that the data produced in the batch is the always
> the same no matter any recomputations (assuming all processing logic is
> deterministic). So you can commit the batch id + batch data together. And
> then async commit the batch id + offsets.
>
> On Wed, May 22, 2024 at 11:27 AM Anil Dasari 
> wrote:
>
>> Thanks Das, Mtich.
>>
>> Mitch,
>> We process data from Kafka and write it to S3 in Parquet format using
>> Dstreams. To ensure exactly-once delivery and prevent data loss, our
>> process records micro-batch offsets to an external storage at the end of
>> each micro-batch in foreachRDD, which is then used when the job restarts.
>>
>> Das,
>> Thanks for sharing the details. I will look into them.
>> Unfortunately, the listeners process is async and can't guarantee happens
>> before association with microbatch to commit offsets to external storage.
>> But still they will work. Is there a way to access lastProgress in
>> foreachBatch ?
>>
>>
>> On Wed, May 22, 2024 at 7:35 AM Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> If you want to find what offset ranges are present in a microbatch in
>>> Structured Streaming, you have to look at the
>>> StreamingQuery.lastProgress or use the QueryProgressListener
>>> .
>>> Both of these approaches gives you access to the SourceProgress
>>> 
>>> which gives Kafka offsets as a JSON string.
>>>
>>> Hope this helps!
>>>
>>> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 OK to understand better your current model relies on streaming data
 input through Kafka topic, Spark does some ETL and you send to a sink, a
 database for file storage like HDFS etc?

 Your current architecture relies on Direct Streams (DStream) and RDDs
 and you want to move to Spark sStructured Streaming based on dataframes and
 datasets?

 You have not specified your sink

 With regard to your question?

 "Is there an equivalent of Dstream HasOffsetRanges in structure
 streaming to get the microbatch end offsets to the checkpoint in our
 external checkpoint store ?"

 There is not a direct equivalent of DStream HasOffsetRanges in Spark
 Structured Streaming. However, Structured Streaming provides mechanisms to
 achieve similar functionality:

 HTH

 Mich Talebzadeh,
 Technologist | Architect | Data Engineer  | 

Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Mich Talebzadeh
With regard to this sentence


*Offset Tracking with Structured Streaming:: While storing offsets in an
external storage with DStreams was necessary, SSS handles this
automatically through checkpointing. The checkpoints include the offsets
processed by each micro-batch. However, you can still access the most
recent offsets using the offset() method on your StreamingQuery object for
monitoring purposes that is if you need it*

In essence, with SSS and checkpointing in place, you can rely on the
automatic offset management provided by the framework,
*eliminating the need for the custom offset storage you had with DStreams.*

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Wed, 22 May 2024 at 19:49, Mich Talebzadeh 
wrote:

> Hi  Anil,
>
> Ok let us put the complete picture here
>
>* Current DStreams Setup:*
>
>- Data Source: Kafka
>- Processing Engine: Spark DStreams
>- Data Transformation with Spark
>- Sink: S3
>- Data Format: Parquet
>- Exactly-Once Delivery (Attempted): You're attempting exactly-once
>delivery by recording micro-batch offsets in an external storage using
>foreachRDD at the end of each micro-batch. This allows you to potentially
>restart the job from the last processed offset in case of failures?
>- Challenges with DStreams for Exactly-Once Delivery: Spark DStreams
>offer limited built-in support for exactly-once delivery guarantees.
>
>
> *Moving to Spark Structured Streaming: (SSS)*
>
> All stays the same. except below
>
>
>- Exactly-Once Delivery which is guaranteed by SSS
>- Checkpointing: Enable checkpointing by setting the
>checkpointLocation option in  writeStream. Spark will periodically
>checkpoint the state of streaming query, including offsets, to a designated
>location (e.g., HDFS, cloud storage or SSD).
>- Offset Tracking with Structured Streaming:: While storing offsets in
>an external storage with DStreams was necessary, SSS handles this
>automatically through checkpointing. The checkpoints include the offsets
>processed by each micro-batch. However, you can still access the most
>recent offsets using the offset() method on your StreamingQuery object for
>monitoring purposes that is if you need it
>
> Have a look at this article of mine  about  structured streaming  and
> checkpointing
>
> Processing Change Data Capture with Spark Structured Streaming
> 
>
> In your case briefly
>
> def *store_offsets_to_checkpoint*(df, batchId):
> if(len(df.take(1))) > 0:
>  df. persist()
>  # Extract offsets from the DataFrame (assuming a column named
> 'offset')
>  offset_rows = df.select(col('offset')).rdd.collect()
>  # Create OffsetRange objects from extracted offsets
>  offsets = [OffsetRange(partition=row.partition,
> fromOffset=row.offset, toOffset=row.offset + 1) # Assuming 'partition'
> and 'offset' columns
> for row in offset_rows]
>  # Logic to store offsets in your external checkpoint store)
>   ..
>   df.unpersist()
> else:
>   print("DataFrame is empty")
>
> # Define your Structured Streaming application with Kafka source and sink
>
>"""
>"foreach" performs custom write logic on each row and
> "foreachBatch" performs custom write logic on each micro-batch through
> *store_offsets_to_checkpoint* function
> foreachBatch(*store_offsets_to_checkpoint*) expects 2
> parameters, first: micro-batch as DataFrame or Dataset and second: unique
> id for each batch
>Using foreachBatch, we write each micro batch to storage
> defined in our custom logic
> """
>
> streaming = spark.readStream \
>.format("kafka") \ .
> option("kafka.bootstrap.servers", "localhost:9092") \
>   .option("subscribe", "topic_name") \
>   .load()
>
> # Custom sink function to store offsets in checkpoint
> streaming = streaming.writeStream \
>  . format("memory")  \
>  *.option("checkpointLocation", "/path/to/checkpoint/store") \ *
>   .foreachBatch(*store_offsets_to_checkpoint*) \
>   .start()
>
> HTH
>
>
> 
>
> 

Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Mich Talebzadeh
Hi  Anil,

Ok let us put the complete picture here

   * Current DStreams Setup:*

   - Data Source: Kafka
   - Processing Engine: Spark DStreams
   - Data Transformation with Spark
   - Sink: S3
   - Data Format: Parquet
   - Exactly-Once Delivery (Attempted): You're attempting exactly-once
   delivery by recording micro-batch offsets in an external storage using
   foreachRDD at the end of each micro-batch. This allows you to potentially
   restart the job from the last processed offset in case of failures?
   - Challenges with DStreams for Exactly-Once Delivery: Spark DStreams
   offer limited built-in support for exactly-once delivery guarantees.


*Moving to Spark Structured Streaming: (SSS)*

All stays the same. except below


   - Exactly-Once Delivery which is guaranteed by SSS
   - Checkpointing: Enable checkpointing by setting the checkpointLocation
   option in  writeStream. Spark will periodically checkpoint the state of
   streaming query, including offsets, to a designated location (e.g., HDFS,
   cloud storage or SSD).
   - Offset Tracking with Structured Streaming:: While storing offsets in
   an external storage with DStreams was necessary, SSS handles this
   automatically through checkpointing. The checkpoints include the offsets
   processed by each micro-batch. However, you can still access the most
   recent offsets using the offset() method on your StreamingQuery object for
   monitoring purposes that is if you need it

Have a look at this article of mine  about  structured streaming  and
checkpointing

Processing Change Data Capture with Spark Structured Streaming


In your case briefly

def *store_offsets_to_checkpoint*(df, batchId):
if(len(df.take(1))) > 0:
 df. persist()
 # Extract offsets from the DataFrame (assuming a column named
'offset')
 offset_rows = df.select(col('offset')).rdd.collect()
 # Create OffsetRange objects from extracted offsets
 offsets = [OffsetRange(partition=row.partition,
fromOffset=row.offset, toOffset=row.offset + 1) # Assuming 'partition' and
'offset' columns
for row in offset_rows]
 # Logic to store offsets in your external checkpoint store)
  ..
  df.unpersist()
else:
  print("DataFrame is empty")

# Define your Structured Streaming application with Kafka source and sink

   """
   "foreach" performs custom write logic on each row and
"foreachBatch" performs custom write logic on each micro-batch through
*store_offsets_to_checkpoint* function
foreachBatch(*store_offsets_to_checkpoint*) expects 2
parameters, first: micro-batch as DataFrame or Dataset and second: unique
id for each batch
   Using foreachBatch, we write each micro batch to storage
defined in our custom logic
"""

streaming = spark.readStream \
   .format("kafka") \ .
option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "topic_name") \
  .load()

# Custom sink function to store offsets in checkpoint
streaming = streaming.writeStream \
 . format("memory")  \
 *.option("checkpointLocation", "/path/to/checkpoint/store") \ *
  .foreachBatch(*store_offsets_to_checkpoint*) \
  .start()

HTH



Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Wed, 22 May 2024 at 16:27, Anil Dasari  wrote:

> Thanks Das, Mtich.
>
> Mitch,
> We process data from Kafka and write it to S3 in Parquet format using
> Dstreams. To ensure exactly-once delivery and prevent data loss, our
> process records micro-batch offsets to an external storage at the end of
> each micro-batch in foreachRDD, which is then used when the job restarts.
>
> Das,
> Thanks for sharing the details. I will look into them.
> Unfortunately, the listeners process is async and can't guarantee happens
> before association with microbatch to commit offsets to external storage.
> But still they will work. Is there a way to access lastProgress in
> foreachBatch ?
>
>
> On Wed, May 22, 2024 at 7:35 AM Tathagata Das 
> wrote:
>
>> If you want to find what 

Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Tathagata Das
The right way to associated microbatches when committing to external
storage is to use the microbatch id that you can get in foreachBatch. That
microbatch id guarantees that the data produced in the batch is the always
the same no matter any recomputations (assuming all processing logic is
deterministic). So you can commit the batch id + batch data together. And
then async commit the batch id + offsets.

On Wed, May 22, 2024 at 11:27 AM Anil Dasari  wrote:

> Thanks Das, Mtich.
>
> Mitch,
> We process data from Kafka and write it to S3 in Parquet format using
> Dstreams. To ensure exactly-once delivery and prevent data loss, our
> process records micro-batch offsets to an external storage at the end of
> each micro-batch in foreachRDD, which is then used when the job restarts.
>
> Das,
> Thanks for sharing the details. I will look into them.
> Unfortunately, the listeners process is async and can't guarantee happens
> before association with microbatch to commit offsets to external storage.
> But still they will work. Is there a way to access lastProgress in
> foreachBatch ?
>
>
> On Wed, May 22, 2024 at 7:35 AM Tathagata Das 
> wrote:
>
>> If you want to find what offset ranges are present in a microbatch in
>> Structured Streaming, you have to look at the StreamingQuery.lastProgress or
>> use the QueryProgressListener
>> .
>> Both of these approaches gives you access to the SourceProgress
>> 
>> which gives Kafka offsets as a JSON string.
>>
>> Hope this helps!
>>
>> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> OK to understand better your current model relies on streaming data
>>> input through Kafka topic, Spark does some ETL and you send to a sink, a
>>> database for file storage like HDFS etc?
>>>
>>> Your current architecture relies on Direct Streams (DStream) and RDDs
>>> and you want to move to Spark sStructured Streaming based on dataframes and
>>> datasets?
>>>
>>> You have not specified your sink
>>>
>>> With regard to your question?
>>>
>>> "Is there an equivalent of Dstream HasOffsetRanges in structure
>>> streaming to get the microbatch end offsets to the checkpoint in our
>>> external checkpoint store ?"
>>>
>>> There is not a direct equivalent of DStream HasOffsetRanges in Spark
>>> Structured Streaming. However, Structured Streaming provides mechanisms to
>>> achieve similar functionality:
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> Von Braun
>>> )".
>>>
>>>
>>> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID
>>>  wrote:
>>>
 Hello,

 what options are you considering yourself?

 On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari <
 adas...@guidewire.com> wrote:


 Hello,

 We are on Spark 3.x and using Spark dstream + kafka and planning to use
 structured streaming + Kafka.
 Is there an equivalent of Dstream HasOffsetRanges in structure
 streaming to get the microbatch end offsets to the checkpoint in our
 external checkpoint store ? Thanks in advance.

 Regards




Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Anil Dasari
Thanks Das, Mtich.

Mitch,
We process data from Kafka and write it to S3 in Parquet format using
Dstreams. To ensure exactly-once delivery and prevent data loss, our
process records micro-batch offsets to an external storage at the end of
each micro-batch in foreachRDD, which is then used when the job restarts.

Das,
Thanks for sharing the details. I will look into them.
Unfortunately, the listeners process is async and can't guarantee happens
before association with microbatch to commit offsets to external storage.
But still they will work. Is there a way to access lastProgress in
foreachBatch ?


On Wed, May 22, 2024 at 7:35 AM Tathagata Das 
wrote:

> If you want to find what offset ranges are present in a microbatch in
> Structured Streaming, you have to look at the StreamingQuery.lastProgress or
> use the QueryProgressListener
> .
> Both of these approaches gives you access to the SourceProgress
> 
> which gives Kafka offsets as a JSON string.
>
> Hope this helps!
>
> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> OK to understand better your current model relies on streaming data input
>> through Kafka topic, Spark does some ETL and you send to a sink, a
>> database for file storage like HDFS etc?
>>
>> Your current architecture relies on Direct Streams (DStream) and RDDs and
>> you want to move to Spark sStructured Streaming based on dataframes and
>> datasets?
>>
>> You have not specified your sink
>>
>> With regard to your question?
>>
>> "Is there an equivalent of Dstream HasOffsetRanges in structure
>> streaming to get the microbatch end offsets to the checkpoint in our
>> external checkpoint store ?"
>>
>> There is not a direct equivalent of DStream HasOffsetRanges in Spark
>> Structured Streaming. However, Structured Streaming provides mechanisms to
>> achieve similar functionality:
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>> 
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von
>> Braun
>> 
>> )".
>>
>>
>> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID
>>  wrote:
>>
>>> Hello,
>>>
>>> what options are you considering yourself?
>>>
>>> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari <
>>> adas...@guidewire.com> wrote:
>>>
>>>
>>> Hello,
>>>
>>> We are on Spark 3.x and using Spark dstream + kafka and planning to use
>>> structured streaming + Kafka.
>>> Is there an equivalent of Dstream HasOffsetRanges in structure streaming
>>> to get the microbatch end offsets to the checkpoint in our external
>>> checkpoint store ? Thanks in advance.
>>>
>>> Regards
>>>
>>>


Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Tathagata Das
If you want to find what offset ranges are present in a microbatch in
Structured Streaming, you have to look at the StreamingQuery.lastProgress or
use the QueryProgressListener
.
Both of these approaches gives you access to the SourceProgress

which gives Kafka offsets as a JSON string.

Hope this helps!

On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh 
wrote:

> OK to understand better your current model relies on streaming data input
> through Kafka topic, Spark does some ETL and you send to a sink, a
> database for file storage like HDFS etc?
>
> Your current architecture relies on Direct Streams (DStream) and RDDs and
> you want to move to Spark sStructured Streaming based on dataframes and
> datasets?
>
> You have not specified your sink
>
> With regard to your question?
>
> "Is there an equivalent of Dstream HasOffsetRanges in structure streaming
> to get the microbatch end offsets to the checkpoint in our external
> checkpoint store ?"
>
> There is not a direct equivalent of DStream HasOffsetRanges in Spark
> Structured Streaming. However, Structured Streaming provides mechanisms to
> achieve similar functionality:
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID
>  wrote:
>
>> Hello,
>>
>> what options are you considering yourself?
>>
>> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari <
>> adas...@guidewire.com> wrote:
>>
>>
>> Hello,
>>
>> We are on Spark 3.x and using Spark dstream + kafka and planning to use
>> structured streaming + Kafka.
>> Is there an equivalent of Dstream HasOffsetRanges in structure streaming
>> to get the microbatch end offsets to the checkpoint in our external
>> checkpoint store ? Thanks in advance.
>>
>> Regards
>>
>>


Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Mich Talebzadeh
OK to understand better your current model relies on streaming data input
through Kafka topic, Spark does some ETL and you send to a sink, a
database for file storage like HDFS etc?

Your current architecture relies on Direct Streams (DStream) and RDDs and
you want to move to Spark sStructured Streaming based on dataframes and
datasets?

You have not specified your sink

With regard to your question?

"Is there an equivalent of Dstream HasOffsetRanges in structure streaming
to get the microbatch end offsets to the checkpoint in our external
checkpoint store ?"

There is not a direct equivalent of DStream HasOffsetRanges in Spark
Structured Streaming. However, Structured Streaming provides mechanisms to
achieve similar functionality:

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID
 wrote:

> Hello,
>
> what options are you considering yourself?
>
> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari <
> adas...@guidewire.com> wrote:
>
>
> Hello,
>
> We are on Spark 3.x and using Spark dstream + kafka and planning to use
> structured streaming + Kafka.
> Is there an equivalent of Dstream HasOffsetRanges in structure streaming
> to get the microbatch end offsets to the checkpoint in our external
> checkpoint store ? Thanks in advance.
>
> Regards
>
>


Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread ashok34...@yahoo.com.INVALID
 Hello,
what options are you considering yourself?
On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari 
 wrote:  
 
 Hello,

We are on Spark 3.x and using Spark dstream + kafka and planning to use 
structured streaming + Kafka. Is there an equivalent of Dstream HasOffsetRanges 
in structure streaming to get the microbatch end offsets to the checkpoint in 
our external checkpoint store ? Thanks in advance. 
Regards