Re: StructuredStreaming - foreach/foreachBatch
Thanks, Gourav - will check out the book. regds, Karan Alang On Thu, Feb 17, 2022 at 9:05 AM Gourav Sengupta wrote: > Hi, > > The following excellent documentation may help as well: > https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch > > The book from Dr. Zaharia on SPARK does a fantastic job in explaining the > fundamental thinking behind these concepts. > > > Regards, > Gourav Sengupta > > > > On Wed, Feb 9, 2022 at 8:51 PM karan alang wrote: > >> Thanks, Mich .. will check it out >> >> regds, >> Karan Alang >> >> On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh >> wrote: >> >>> BTW you can check this Linkedin article of mine on Processing Change >>> Data Capture with Spark Structured Streaming >>> <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D> >>> >>> >>> It covers the concept of triggers including trigger(once = True) or >>> one-time batch in Spark Structured Streaming >>> >>> >>> HTH >>> >>> >>>view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Mon, 7 Feb 2022 at 23:06, karan alang wrote: >>> >>>> Thanks, Mich .. that worked fine! >>>> >>>> >>>> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> read below >>>>> >>>>> """ >>>>>"foreach" performs custom write logic on each row and >>>>> "foreachBatch" performs custom write logic on each micro-batch through >>>>> SendToBigQuery function >>>>> *foreachBatch(SendToBigQuery) expects 2 parameters, >>>>> first: micro-batch as DataFrame or Dataset and second: unique id for each >>>>> batch --> batchId* >>>>>Using foreachBatch, we write each micro batch to >>>>> storage defined in our custom logic. In this case, we store the output of >>>>> our streaming application to Google BigQuery table. >>>>>Note that we are appending data and column "rowkey" is >>>>> defined as UUID so it can be used as the primary key >>>>> """ >>>>> result = streamingDataFrame.select( \ >>>>> col("parsed_value.rowkey").alias("rowkey") \ >>>>>, col("parsed_value.ticker").alias("ticker") \ >>>>>, >>>>> col("parsed_value.timeissued").alias("timeissued") \ >>>>>, col("parsed_value.price").alias("price")). \ >>>>> writeStream. \ >>>>> outputMode('append'). \ >>>>> option("truncate", "false"). \ >>>>> *foreachBatch(SendToBigQuery)*. \ >>>>> trigger(processingTime='2 seconds'). \ >>>>> start() >>>>> >>>>> now you define your function *SendToBigQuery() * >>>>> >>>>> >>>>> *def SendToBigQuery(df, batchId):* >>>>> >>>>> if(len(df.take(1))) > 0: >>>>> >>>>> df.printSchema() >>>>> >>>>> print(f"""batchId is {batchId}""") >>>>> >>>>> rows = df.count() >>>>> >>>>> print(f""" Total records processed in this run = {rows}""") >>>>> >>>>> .. >>>>> >>>>> else: >>>>> >>>>> print("DataFrame is empty") >>>>> >>>>> *HTH* >>>>> >>>>> >>>>>view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, 7 Feb 2022 at 21:06, karan alang >>>>> wrote: >>>>> >>>>>> Hello All, >>>>>> >>>>>> I'm using StructuredStreaming to read data from Kafka, and need to do >>>>>> transformation on each individual row. >>>>>> >>>>>> I'm trying to use 'foreach' (or foreachBatch), and running into >>>>>> issues. >>>>>> Basic question - how is the row passed to the function when foreach >>>>>> is used ? >>>>>> >>>>>> Also, when I use foreachBatch, seems the BatchId is available in the >>>>>> function called ? How do I access individual rows ? >>>>>> >>>>>> Details are in stackoverflow : >>>>>> >>>>>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working >>>>>> >>>>>> What is the best approach for this use-case ? >>>>>> >>>>>> tia! >>>>>> >>>>>
Re: StructuredStreaming - foreach/foreachBatch
Hello Gourav, I`’ll read this Document. Thanks. > On 17 Feb 2022, at 14:05, Gourav Sengupta wrote: > > Hi, > > The following excellent documentation may help as well: > https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch > > <https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch> > > > The book from Dr. Zaharia on SPARK does a fantastic job in explaining the > fundamental thinking behind these concepts. > > > Regards, > Gourav Sengupta > > > > On Wed, Feb 9, 2022 at 8:51 PM karan alang <mailto:karan.al...@gmail.com>> wrote: > Thanks, Mich .. will check it out > > regds, > Karan Alang > > On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh <mailto:mich.talebza...@gmail.com>> wrote: > BTW you can check this Linkedin article of mine on Processing Change Data > Capture with Spark Structured Streaming > <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D> > > It covers the concept of triggers including trigger(once = True) or one-time > batch in Spark Structured Streaming > > HTH > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > Disclaimer: Use it at your own risk. Any and all responsibility for any loss, > damage or destruction of data or any other property which may arise from > relying on this email's technical content is explicitly disclaimed. The > author will in no case be liable for any monetary damages arising from such > loss, damage or destruction. > > > > On Mon, 7 Feb 2022 at 23:06, karan alang <mailto:karan.al...@gmail.com>> wrote: > Thanks, Mich .. that worked fine! > > > On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh <mailto:mich.talebza...@gmail.com>> wrote: > read below > > """ >"foreach" performs custom write logic on each row and > "foreachBatch" performs custom write logic on each micro-batch through > SendToBigQuery function > foreachBatch(SendToBigQuery) expects 2 parameters, first: > micro-batch as DataFrame or Dataset and second: unique id for each batch --> > batchId >Using foreachBatch, we write each micro batch to storage > defined in our custom logic. In this case, we store the output of our > streaming application to Google BigQuery table. >Note that we are appending data and column "rowkey" is defined > as UUID so it can be used as the primary key > """ > result = streamingDataFrame.select( \ > col("parsed_value.rowkey").alias("rowkey") \ >, col("parsed_value.ticker").alias("ticker") \ >, col("parsed_value.timeissued").alias("timeissued") \ >, col("parsed_value.price").alias("price")). \ > writeStream. \ > outputMode('append'). \ > option("truncate", "false"). \ > foreachBatch(SendToBigQuery). \ > trigger(processingTime='2 seconds'). \ > start() > > now you define your function SendToBigQuery() > > def SendToBigQuery(df, batchId): > if(len(df.take(1))) > 0: > df.printSchema() > print(f"""batchId is {batchId}""") > rows = df.count() > print(f""" Total records processed in this run = {rows}""") > .. > else: > print("DataFrame is empty") > > HTH > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > Disclaimer: Use it at your own risk. Any and all responsibility for any loss, > damage or destruction of data or any other property which may arise from > relying on this email's technical content is explicitly disclaimed. The > author will in no case be liable for any monetary damages arising from such > loss, damage or destruction. > > > > On Mon, 7 Feb 2022 at 21:06, karan alang <mailto:karan.al...@gmail.com>> wrote: > Hello All, > > I'm using StructuredStreaming to read data from Kafka, and need to do > transformation on each individual row. > > I'm trying to use 'foreach' (or foreachBatch), and running into issues. > Basic question - how is the row passed to the function when foreach is used ? > > Also, when I use foreachBatch, seems the BatchId is available in the function > called ? How do I access individual rows ? > > Details are in stackoverflow : > https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working > > <https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working> > > What is the best approach for this use-case ? > > tia!
Re: StructuredStreaming - foreach/foreachBatch
Hi, The following excellent documentation may help as well: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch The book from Dr. Zaharia on SPARK does a fantastic job in explaining the fundamental thinking behind these concepts. Regards, Gourav Sengupta On Wed, Feb 9, 2022 at 8:51 PM karan alang wrote: > Thanks, Mich .. will check it out > > regds, > Karan Alang > > On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh > wrote: > >> BTW you can check this Linkedin article of mine on Processing Change >> Data Capture with Spark Structured Streaming >> <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D> >> >> >> It covers the concept of triggers including trigger(once = True) or >> one-time batch in Spark Structured Streaming >> >> >> HTH >> >> >>view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Mon, 7 Feb 2022 at 23:06, karan alang wrote: >> >>> Thanks, Mich .. that worked fine! >>> >>> >>> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> read below >>>> >>>> """ >>>>"foreach" performs custom write logic on each row and >>>> "foreachBatch" performs custom write logic on each micro-batch through >>>> SendToBigQuery function >>>> *foreachBatch(SendToBigQuery) expects 2 parameters, >>>> first: micro-batch as DataFrame or Dataset and second: unique id for each >>>> batch --> batchId* >>>>Using foreachBatch, we write each micro batch to storage >>>> defined in our custom logic. In this case, we store the output of our >>>> streaming application to Google BigQuery table. >>>>Note that we are appending data and column "rowkey" is >>>> defined as UUID so it can be used as the primary key >>>> """ >>>> result = streamingDataFrame.select( \ >>>> col("parsed_value.rowkey").alias("rowkey") \ >>>>, col("parsed_value.ticker").alias("ticker") \ >>>>, col("parsed_value.timeissued").alias("timeissued") >>>> \ >>>>, col("parsed_value.price").alias("price")). \ >>>> writeStream. \ >>>> outputMode('append'). \ >>>> option("truncate", "false"). \ >>>> *foreachBatch(SendToBigQuery)*. \ >>>> trigger(processingTime='2 seconds'). \ >>>> start() >>>> >>>> now you define your function *SendToBigQuery() * >>>> >>>> >>>> *def SendToBigQuery(df, batchId):* >>>> >>>> if(len(df.take(1))) > 0: >>>> >>>> df.printSchema() >>>> >>>> print(f"""batchId is {batchId}""") >>>> >>>> rows = df.count() >>>> >>>> print(f""" Total records processed in this run = {rows}""") >>>> >>>> .. >>>> >>>> else: >>>> >>>> print("DataFrame is empty") >>>> >>>> *HTH* >>>> >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Mon, 7 Feb 2022 at 21:06, karan alang wrote: >>>> >>>>> Hello All, >>>>> >>>>> I'm using StructuredStreaming to read data from Kafka, and need to do >>>>> transformation on each individual row. >>>>> >>>>> I'm trying to use 'foreach' (or foreachBatch), and running into issues. >>>>> Basic question - how is the row passed to the function when foreach is >>>>> used ? >>>>> >>>>> Also, when I use foreachBatch, seems the BatchId is available in the >>>>> function called ? How do I access individual rows ? >>>>> >>>>> Details are in stackoverflow : >>>>> >>>>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working >>>>> >>>>> What is the best approach for this use-case ? >>>>> >>>>> tia! >>>>> >>>>
Re: StructuredStreaming - foreach/foreachBatch
Thanks, Mich .. will check it out regds, Karan Alang On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh wrote: > BTW you can check this Linkedin article of mine on Processing Change Data > Capture with Spark Structured Streaming > <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D> > > > It covers the concept of triggers including trigger(once = True) or > one-time batch in Spark Structured Streaming > > > HTH > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Mon, 7 Feb 2022 at 23:06, karan alang wrote: > >> Thanks, Mich .. that worked fine! >> >> >> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh >> wrote: >> >>> read below >>> >>> """ >>>"foreach" performs custom write logic on each row and >>> "foreachBatch" performs custom write logic on each micro-batch through >>> SendToBigQuery function >>> *foreachBatch(SendToBigQuery) expects 2 parameters, >>> first: micro-batch as DataFrame or Dataset and second: unique id for each >>> batch --> batchId* >>>Using foreachBatch, we write each micro batch to storage >>> defined in our custom logic. In this case, we store the output of our >>> streaming application to Google BigQuery table. >>>Note that we are appending data and column "rowkey" is >>> defined as UUID so it can be used as the primary key >>> """ >>> result = streamingDataFrame.select( \ >>> col("parsed_value.rowkey").alias("rowkey") \ >>>, col("parsed_value.ticker").alias("ticker") \ >>>, col("parsed_value.timeissued").alias("timeissued") \ >>>, col("parsed_value.price").alias("price")). \ >>> writeStream. \ >>> outputMode('append'). \ >>> option("truncate", "false"). \ >>> *foreachBatch(SendToBigQuery)*. \ >>> trigger(processingTime='2 seconds'). \ >>> start() >>> >>> now you define your function *SendToBigQuery() * >>> >>> >>> *def SendToBigQuery(df, batchId):* >>> >>> if(len(df.take(1))) > 0: >>> >>> df.printSchema() >>> >>> print(f"""batchId is {batchId}""") >>> >>> rows = df.count() >>> >>> print(f""" Total records processed in this run = {rows}""") >>> >>> .. >>> >>> else: >>> >>> print("DataFrame is empty") >>> >>> *HTH* >>> >>> >>>view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Mon, 7 Feb 2022 at 21:06, karan alang wrote: >>> >>>> Hello All, >>>> >>>> I'm using StructuredStreaming to read data from Kafka, and need to do >>>> transformation on each individual row. >>>> >>>> I'm trying to use 'foreach' (or foreachBatch), and running into issues. >>>> Basic question - how is the row passed to the function when foreach is >>>> used ? >>>> >>>> Also, when I use foreachBatch, seems the BatchId is available in the >>>> function called ? How do I access individual rows ? >>>> >>>> Details are in stackoverflow : >>>> >>>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working >>>> >>>> What is the best approach for this use-case ? >>>> >>>> tia! >>>> >>>
Re: StructuredStreaming - foreach/foreachBatch
BTW you can check this Linkedin article of mine on Processing Change Data Capture with Spark Structured Streaming <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D> It covers the concept of triggers including trigger(once = True) or one-time batch in Spark Structured Streaming HTH view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 7 Feb 2022 at 23:06, karan alang wrote: > Thanks, Mich .. that worked fine! > > > On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh > wrote: > >> read below >> >> """ >>"foreach" performs custom write logic on each row and >> "foreachBatch" performs custom write logic on each micro-batch through >> SendToBigQuery function >> *foreachBatch(SendToBigQuery) expects 2 parameters, >> first: micro-batch as DataFrame or Dataset and second: unique id for each >> batch --> batchId* >>Using foreachBatch, we write each micro batch to storage >> defined in our custom logic. In this case, we store the output of our >> streaming application to Google BigQuery table. >>Note that we are appending data and column "rowkey" is >> defined as UUID so it can be used as the primary key >> """ >> result = streamingDataFrame.select( \ >> col("parsed_value.rowkey").alias("rowkey") \ >>, col("parsed_value.ticker").alias("ticker") \ >>, col("parsed_value.timeissued").alias("timeissued") \ >>, col("parsed_value.price").alias("price")). \ >> writeStream. \ >> outputMode('append'). \ >> option("truncate", "false"). \ >> *foreachBatch(SendToBigQuery)*. \ >> trigger(processingTime='2 seconds'). \ >> start() >> >> now you define your function *SendToBigQuery() * >> >> >> *def SendToBigQuery(df, batchId):* >> >> if(len(df.take(1))) > 0: >> >> df.printSchema() >> >> print(f"""batchId is {batchId}""") >> >> rows = df.count() >> >> print(f""" Total records processed in this run = {rows}""") >> >> .. >> >> else: >> >> print("DataFrame is empty") >> >> *HTH* >> >> >>view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Mon, 7 Feb 2022 at 21:06, karan alang wrote: >> >>> Hello All, >>> >>> I'm using StructuredStreaming to read data from Kafka, and need to do >>> transformation on each individual row. >>> >>> I'm trying to use 'foreach' (or foreachBatch), and running into issues. >>> Basic question - how is the row passed to the function when foreach is >>> used ? >>> >>> Also, when I use foreachBatch, seems the BatchId is available in the >>> function called ? How do I access individual rows ? >>> >>> Details are in stackoverflow : >>> >>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working >>> >>> What is the best approach for this use-case ? >>> >>> tia! >>> >>
Re: StructuredStreaming - foreach/foreachBatch
Thanks, Mich .. that worked fine! On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh wrote: > read below > > """ >"foreach" performs custom write logic on each row and > "foreachBatch" performs custom write logic on each micro-batch through > SendToBigQuery function > *foreachBatch(SendToBigQuery) expects 2 parameters, > first: micro-batch as DataFrame or Dataset and second: unique id for each > batch --> batchId* >Using foreachBatch, we write each micro batch to storage > defined in our custom logic. In this case, we store the output of our > streaming application to Google BigQuery table. >Note that we are appending data and column "rowkey" is > defined as UUID so it can be used as the primary key > """ > result = streamingDataFrame.select( \ > col("parsed_value.rowkey").alias("rowkey") \ >, col("parsed_value.ticker").alias("ticker") \ >, col("parsed_value.timeissued").alias("timeissued") \ >, col("parsed_value.price").alias("price")). \ > writeStream. \ > outputMode('append'). \ > option("truncate", "false"). \ > *foreachBatch(SendToBigQuery)*. \ > trigger(processingTime='2 seconds'). \ > start() > > now you define your function *SendToBigQuery() * > > > *def SendToBigQuery(df, batchId):* > > if(len(df.take(1))) > 0: > > df.printSchema() > > print(f"""batchId is {batchId}""") > > rows = df.count() > > print(f""" Total records processed in this run = {rows}""") > > .. > > else: > > print("DataFrame is empty") > > *HTH* > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Mon, 7 Feb 2022 at 21:06, karan alang wrote: > >> Hello All, >> >> I'm using StructuredStreaming to read data from Kafka, and need to do >> transformation on each individual row. >> >> I'm trying to use 'foreach' (or foreachBatch), and running into issues. >> Basic question - how is the row passed to the function when foreach is >> used ? >> >> Also, when I use foreachBatch, seems the BatchId is available in the >> function called ? How do I access individual rows ? >> >> Details are in stackoverflow : >> >> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working >> >> What is the best approach for this use-case ? >> >> tia! >> >
Re: StructuredStreaming - foreach/foreachBatch
read below """ "foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function *foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch --> batchId* Using foreachBatch, we write each micro batch to storage defined in our custom logic. In this case, we store the output of our streaming application to Google BigQuery table. Note that we are appending data and column "rowkey" is defined as UUID so it can be used as the primary key """ result = streamingDataFrame.select( \ col("parsed_value.rowkey").alias("rowkey") \ , col("parsed_value.ticker").alias("ticker") \ , col("parsed_value.timeissued").alias("timeissued") \ , col("parsed_value.price").alias("price")). \ writeStream. \ outputMode('append'). \ option("truncate", "false"). \ *foreachBatch(SendToBigQuery)*. \ trigger(processingTime='2 seconds'). \ start() now you define your function *SendToBigQuery() * *def SendToBigQuery(df, batchId):* if(len(df.take(1))) > 0: df.printSchema() print(f"""batchId is {batchId}""") rows = df.count() print(f""" Total records processed in this run = {rows}""") .. else: print("DataFrame is empty") *HTH* view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 7 Feb 2022 at 21:06, karan alang wrote: > Hello All, > > I'm using StructuredStreaming to read data from Kafka, and need to do > transformation on each individual row. > > I'm trying to use 'foreach' (or foreachBatch), and running into issues. > Basic question - how is the row passed to the function when foreach is > used ? > > Also, when I use foreachBatch, seems the BatchId is available in the > function called ? How do I access individual rows ? > > Details are in stackoverflow : > > https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working > > What is the best approach for this use-case ? > > tia! >
StructuredStreaming - foreach/foreachBatch
Hello All, I'm using StructuredStreaming to read data from Kafka, and need to do transformation on each individual row. I'm trying to use 'foreach' (or foreachBatch), and running into issues. Basic question - how is the row passed to the function when foreach is used ? Also, when I use foreachBatch, seems the BatchId is available in the function called ? How do I access individual rows ? Details are in stackoverflow : https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working What is the best approach for this use-case ? tia!