Hi,

The following excellent documentation may help as well:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

The book from Dr. Zaharia on SPARK does a fantastic job in explaining the
fundamental thinking behind these concepts.


Regards,
Gourav Sengupta



On Wed, Feb 9, 2022 at 8:51 PM karan alang <karan.al...@gmail.com> wrote:

> Thanks, Mich .. will check it out
>
> regds,
> Karan Alang
>
> On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> BTW you can check this Linkedin article of mine on Processing Change
>> Data Capture with Spark Structured Streaming
>> <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D>
>>
>>
>> It covers the concept of triggers including trigger(once = True) or
>> one-time batch in Spark Structured Streaming
>>
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 7 Feb 2022 at 23:06, karan alang <karan.al...@gmail.com> wrote:
>>
>>> Thanks, Mich .. that worked fine!
>>>
>>>
>>> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> read below
>>>>
>>>>             """
>>>>                "foreach" performs custom write logic on each row and
>>>> "foreachBatch" performs custom write logic on each micro-batch through
>>>> SendToBigQuery function
>>>>                 *foreachBatch(SendToBigQuery) expects 2 parameters,
>>>> first: micro-batch as DataFrame or Dataset and second: unique id for each
>>>> batch --> batchId*
>>>>                Using foreachBatch, we write each micro batch to storage
>>>> defined in our custom logic. In this case, we store the output of our
>>>> streaming application to Google BigQuery table.
>>>>                Note that we are appending data and column "rowkey" is
>>>> defined as UUID so it can be used as the primary key
>>>>             """
>>>>             result = streamingDataFrame.select( \
>>>>                      col("parsed_value.rowkey").alias("rowkey") \
>>>>                    , col("parsed_value.ticker").alias("ticker") \
>>>>                    , col("parsed_value.timeissued").alias("timeissued")
>>>> \
>>>>                    , col("parsed_value.price").alias("price")). \
>>>>                      writeStream. \
>>>>                      outputMode('append'). \
>>>>                      option("truncate", "false"). \
>>>>                      *foreachBatch(SendToBigQuery)*. \
>>>>                      trigger(processingTime='2 seconds'). \
>>>>                      start()
>>>>
>>>> now you define your function *SendToBigQuery() *
>>>>
>>>>
>>>> *def SendToBigQuery(df, batchId):*
>>>>
>>>>     if(len(df.take(1))) > 0:
>>>>
>>>>         df.printSchema()
>>>>
>>>>         print(f"""batchId is {batchId}""")
>>>>
>>>>         rows = df.count()
>>>>
>>>>         print(f""" Total records processed in this run = {rows}""")
>>>>
>>>>         ......
>>>>
>>>>     else:
>>>>
>>>>         print("DataFrame is empty")
>>>>
>>>> *HTH*
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, 7 Feb 2022 at 21:06, karan alang <karan.al...@gmail.com> wrote:
>>>>
>>>>> Hello All,
>>>>>
>>>>> I'm using StructuredStreaming to read data from Kafka, and need to do
>>>>> transformation on each individual row.
>>>>>
>>>>> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
>>>>> Basic question - how is the row passed to the function when foreach is
>>>>> used ?
>>>>>
>>>>> Also, when I use foreachBatch, seems the BatchId is available in the
>>>>> function called ? How do I access individual rows ?
>>>>>
>>>>> Details are in stackoverflow :
>>>>>
>>>>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>>>>>
>>>>> What is the best approach for this use-case ?
>>>>>
>>>>> tia!
>>>>>
>>>>

Reply via email to