Hi, The following excellent documentation may help as well: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
The book from Dr. Zaharia on SPARK does a fantastic job in explaining the fundamental thinking behind these concepts. Regards, Gourav Sengupta On Wed, Feb 9, 2022 at 8:51 PM karan alang <karan.al...@gmail.com> wrote: > Thanks, Mich .. will check it out > > regds, > Karan Alang > > On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> BTW you can check this Linkedin article of mine on Processing Change >> Data Capture with Spark Structured Streaming >> <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D> >> >> >> It covers the concept of triggers including trigger(once = True) or >> one-time batch in Spark Structured Streaming >> >> >> HTH >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Mon, 7 Feb 2022 at 23:06, karan alang <karan.al...@gmail.com> wrote: >> >>> Thanks, Mich .. that worked fine! >>> >>> >>> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> read below >>>> >>>> """ >>>> "foreach" performs custom write logic on each row and >>>> "foreachBatch" performs custom write logic on each micro-batch through >>>> SendToBigQuery function >>>> *foreachBatch(SendToBigQuery) expects 2 parameters, >>>> first: micro-batch as DataFrame or Dataset and second: unique id for each >>>> batch --> batchId* >>>> Using foreachBatch, we write each micro batch to storage >>>> defined in our custom logic. In this case, we store the output of our >>>> streaming application to Google BigQuery table. >>>> Note that we are appending data and column "rowkey" is >>>> defined as UUID so it can be used as the primary key >>>> """ >>>> result = streamingDataFrame.select( \ >>>> col("parsed_value.rowkey").alias("rowkey") \ >>>> , col("parsed_value.ticker").alias("ticker") \ >>>> , col("parsed_value.timeissued").alias("timeissued") >>>> \ >>>> , col("parsed_value.price").alias("price")). \ >>>> writeStream. \ >>>> outputMode('append'). \ >>>> option("truncate", "false"). \ >>>> *foreachBatch(SendToBigQuery)*. \ >>>> trigger(processingTime='2 seconds'). \ >>>> start() >>>> >>>> now you define your function *SendToBigQuery() * >>>> >>>> >>>> *def SendToBigQuery(df, batchId):* >>>> >>>> if(len(df.take(1))) > 0: >>>> >>>> df.printSchema() >>>> >>>> print(f"""batchId is {batchId}""") >>>> >>>> rows = df.count() >>>> >>>> print(f""" Total records processed in this run = {rows}""") >>>> >>>> ...... >>>> >>>> else: >>>> >>>> print("DataFrame is empty") >>>> >>>> *HTH* >>>> >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Mon, 7 Feb 2022 at 21:06, karan alang <karan.al...@gmail.com> wrote: >>>> >>>>> Hello All, >>>>> >>>>> I'm using StructuredStreaming to read data from Kafka, and need to do >>>>> transformation on each individual row. >>>>> >>>>> I'm trying to use 'foreach' (or foreachBatch), and running into issues. >>>>> Basic question - how is the row passed to the function when foreach is >>>>> used ? >>>>> >>>>> Also, when I use foreachBatch, seems the BatchId is available in the >>>>> function called ? How do I access individual rows ? >>>>> >>>>> Details are in stackoverflow : >>>>> >>>>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working >>>>> >>>>> What is the best approach for this use-case ? >>>>> >>>>> tia! >>>>> >>>>