It might be good to first split the stream up into smaller streams, one per
type.  If ordering of the Kafka records is important, then you could
partition them at the source based on the type, but be careful how you
configure Spark to read from Kafka as that could also influence ordering.

    kdf = (spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
        ...
        .option("includeHeaders", "true")
        .option("startingOffsets", "earliest")  # see below where we
trigger with availableNow and checkpoint
        .option("subscribe", "topicName")  # the name of the topic
        .load())

    kdf = kdf.selectExpr("CAST(key AS STRING)",
                     "CAST(value AS STRING)",
                     "headers",
                     "CAST(topic AS STRING)",
                     "CAST(partition AS STRING)",
                     "CAST(offset AS STRING)")

    kdf_one = kdf.filter(kdf.type == 'one')
    kdf_two = kdf.filter(kdf.type == 'two')
    kdf_three = kdf.filter(kdf.type == 'three')

then transform each one as you need to:

kdf_one = kdf.transform(prepare_for_database_one)

and start each DataFrame and use foreachBatch to store the data in the DB:

    (kdf_one
           .writeStream
           .queryName("one")
           .foreachBatch(saveastable_one)
           .trigger(availableNow=True)
           .option("checkpointLocation", "s3a://checkpointlocation/")  #
very important to be on writeStream!
           .start()
           .awaitTermination())


On Wed, 10 Jan 2024 at 21:01, Khalid Mammadov <khalidmammad...@gmail.com>
wrote:

> Use foreachBatch or foreach methods:
>
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
>
> On Wed, 10 Jan 2024, 17:42 PRASHANT L, <prashant...@gmail.com> wrote:
>
>> Hi
>> I have a use case where I need to process json payloads coming from Kafka
>> using structured streaming , but thing is json can have different formats ,
>> schema is not fixed
>> and each json will have a @type tag so based on tag , json has to be
>> parsed and loaded to table with tag name  , and if a json has nested sub
>> tags , those tags shd go to different table
>> so I need to process each json record individually , and determine
>> destination tables what would be the best approach
>>
>>
>>> *{*
>>> *    "os": "andriod",*
>>> *    "type": "mobile",*
>>> *    "device": {*
>>> *        "warrenty": "3 years",*
>>> *        "replace": "yes"*
>>> *    },*
>>> *    "zones": [*
>>> *        {*
>>> *            "city": "Bangalore",*
>>> *            "state": "KA",*
>>> *            "pin": "577401"*
>>> *        },*
>>> *        {*
>>> *            "city": "Mumbai",*
>>> *            "state": "MH",*
>>> *            "pin": "576003"*
>>> *        }*
>>> *    ],*
>>> *    "@table": "product"**}*
>>
>>
>> so for the above json , there are 3 tables created
>> 1. Product (@type) THis is a parent table
>> 2.  poduct_zones and product_devices , child table
>>
>

-- 
_______________________
Dr Ant Kutschera

Reply via email to