Hello @Tathagata Das <tathagata.das1...@gmail.com> Could you share your thoughts on https://issues.apache.org/jira/browse/SPARK-48418 ? Let me know if you have any questions. thanks.
Regards, Anil On Fri, May 24, 2024 at 12:13 AM Anil Dasari <adas...@guidewire.com> wrote: > It appears that structured streaming and Dstream have entirely different > microbatch metadata representation > Can someone assist me in finding the following Dstream microbatch metadata > equivalent in Structured streaming. > > 1. microbatch timestamp : structured streaming foreachBatch gives batchID > which is not a timestamp. Is there a way to get the microbatch timestamp ? > 2. microbatch start event ? > 3. scheduling delay of a microbatch ? > 4. pending microbatches in case of fixed internal microbatches ? > > Thanks > > On Wed, May 22, 2024 at 5:23 PM Anil Dasari <adas...@guidewire.com> wrote: > >> You are right. >> - another question on migration. Is there a way to get the microbatch id >> during the microbatch dataset `trasform` operation like in rdd transform ? >> I am attempting to implement the following pseudo functionality with >> structured streaming. In this approach, recordCategoriesMetadata is fetched >> and rdd metrics like rdd size etc using microbatch idin the transform >> operation. >> ```code >> val rddQueue = new mutable.Queue[RDD[Int]]() >> // source components >> val sources = Seq.empty[String] >> val consolidatedDstream = sources >> .map(source => { >> val inputStream = ssc.queueStream(rddQueue) >> inputStream.transform((rdd, ts) => { >> // emit metrics of microbatch ts : rdd size etc. >> >> val recordCategories = rdd.map(..).collect(); >> val recordCategoriesMetadata = ... >> rdd >> .map(r => >> val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema) >> (source, customRecord) >> ) >> }) >> } >> ) >> .reduceLeft(_ union _) >> >> consolidatedDstream >> .foreachRDD((rdd, ts) => { >> // get pipes for each source >> val pipes = Seq.empty[String] // pipes of given source >> pipes.foreach(pipe => { >> val pipeSource = null; // get from pipe variable >> val psRDD = rdd >> .filter { >> case (source, sourceRDD) => source.equals(pipeSource) >> } >> // apply pipe transformation and sink >> >> }) >> }) >> ``` >> >> In structured streaming, it can look like - >> >> ```code >> val consolidatedDstream = sources >> .map(source => { >> val inputStream = ... (for each source) >> inputStream >> } >> ) >> .reduceLeft(_ union _) >> >> consolidatedDstream >> .writeStream >> .foreachBatch((ds, ts) => { >> val newDS = ds.transform((internalDS => { >> // emit metrics of microbatch ts : rdd size etc. >> >> val recordCategories = rdd.map(..).collect(); >> val recordCategoriesMetadata = ... >> internalDS >> .map(r => >> val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema) >> (source, customRecord) >> ) >> })(... <encoder>) >> // get pipes for each source >> val pipes = Seq.empty[String] // pipes of given source >> pipes.foreach(pipe => { >> val pipeSource = null; // get from pipe variable >> val psRDD = newDS >> .filter { >> case (source, sourceDS) => source.equals(pipeSource) >> } >> // apply pipe transformation and sink >> >> }) >> }) >> ``` >> ^ is just pseudo code and still not sure if it works. Let me know your >> suggestions if any. thanks. >> >> On Wed, May 22, 2024 at 8:34 AM Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> The right way to associated microbatches when committing to external >>> storage is to use the microbatch id that you can get in foreachBatch. That >>> microbatch id guarantees that the data produced in the batch is the always >>> the same no matter any recomputations (assuming all processing logic is >>> deterministic). So you can commit the batch id + batch data together. And >>> then async commit the batch id + offsets. >>> >>> On Wed, May 22, 2024 at 11:27 AM Anil Dasari <adas...@guidewire.com> >>> wrote: >>> >>>> Thanks Das, Mtich. >>>> >>>> Mitch, >>>> We process data from Kafka and write it to S3 in Parquet format using >>>> Dstreams. To ensure exactly-once delivery and prevent data loss, our >>>> process records micro-batch offsets to an external storage at the end of >>>> each micro-batch in foreachRDD, which is then used when the job restarts. >>>> >>>> Das, >>>> Thanks for sharing the details. I will look into them. >>>> Unfortunately, the listeners process is async and can't >>>> guarantee happens before association with microbatch to commit offsets to >>>> external storage. But still they will work. Is there a way to access >>>> lastProgress in foreachBatch ? >>>> >>>> >>>> On Wed, May 22, 2024 at 7:35 AM Tathagata Das < >>>> tathagata.das1...@gmail.com> wrote: >>>> >>>>> If you want to find what offset ranges are present in a microbatch in >>>>> Structured Streaming, you have to look at the >>>>> StreamingQuery.lastProgress or use the QueryProgressListener >>>>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html>. >>>>> Both of these approaches gives you access to the SourceProgress >>>>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/SourceProgress.html> >>>>> which gives Kafka offsets as a JSON string. >>>>> >>>>> Hope this helps! >>>>> >>>>> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh < >>>>> mich.talebza...@gmail.com> wrote: >>>>> >>>>>> OK to understand better your current model relies on streaming data >>>>>> input through Kafka topic, Spark does some ETL and you send to a sink, a >>>>>> database for file storage like HDFS etc? >>>>>> >>>>>> Your current architecture relies on Direct Streams (DStream) and RDDs >>>>>> and you want to move to Spark sStructured Streaming based on dataframes >>>>>> and >>>>>> datasets? >>>>>> >>>>>> You have not specified your sink >>>>>> >>>>>> With regard to your question? >>>>>> >>>>>> "Is there an equivalent of Dstream HasOffsetRanges in structure >>>>>> streaming to get the microbatch end offsets to the checkpoint in our >>>>>> external checkpoint store ?" >>>>>> >>>>>> There is not a direct equivalent of DStream HasOffsetRanges in Spark >>>>>> Structured Streaming. However, Structured Streaming provides mechanisms >>>>>> to >>>>>> achieve similar functionality: >>>>>> >>>>>> HTH >>>>>> >>>>>> Mich Talebzadeh, >>>>>> Technologist | Architect | Data Engineer | Generative AI | FinCrime >>>>>> London >>>>>> United Kingdom >>>>>> >>>>>> >>>>>> view my Linkedin profile >>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>> >>>>>> >>>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>>> <https://en.everybodywiki.com/Mich_Talebzadeh> >>>>>> >>>>>> >>>>>> >>>>>> *Disclaimer:* The information provided is correct to the best of my >>>>>> knowledge but of course cannot be guaranteed . It is essential to note >>>>>> that, as with any advice, quote "one test result is worth one-thousand >>>>>> expert opinions (Werner >>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von >>>>>> Braun >>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun> >>>>>> )". >>>>>> >>>>>> >>>>>> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID >>>>>> <ashok34...@yahoo.com.invalid> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> what options are you considering yourself? >>>>>>> >>>>>>> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari < >>>>>>> adas...@guidewire.com> wrote: >>>>>>> >>>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> We are on Spark 3.x and using Spark dstream + kafka and planning to >>>>>>> use structured streaming + Kafka. >>>>>>> Is there an equivalent of Dstream HasOffsetRanges in structure >>>>>>> streaming to get the microbatch end offsets to the checkpoint in our >>>>>>> external checkpoint store ? Thanks in advance. >>>>>>> >>>>>>> Regards >>>>>>> >>>>>>>