Hi,

Let us visit the approach as some fellow members correctly highlighted the
use case for spark structured streaming and two key concepts that I will
mention


   - foreach: A method for applying custom write logic to each individual
   row in a streaming DataFrame or Dataset.
   - foreachBatch: A method for applying custom write logic to entire
   micro-batches of data, providing more flexibility for complex operations.
   - sendToSink (my chosen name here,  Custom Logic ) : A user-defined
   function that encapsulates the logic for writing a micro-batch to a sink
   (In my case Google BigQuery DW)


Let us create a pseudo code (in Python for sendToSink function used in f
oreachBatch(SendToBSink)

def sendToSink(df, batchId):

    if len(df.take(1)) > 0:  # Check for empty DataFrame
        try:
            # Extract table names from the "@table" column
            table_names = df.select("@table").rdd.flatMap(lambda row:
row).collect()

            # Iterate through each table name
            for table_name in table_names:
                # Filter the DataFrame for rows belonging to the current
table
                table_df = df.filter(col("@table") == table_name)

                # Handle nested structures for specific tables
                if table_name in ["product_zones", "product_devices"]:
                    # Extract nested data (e.g., "zones" or "device"
columns)
                    nested_data = table_df.select("zones",
"device").rdd.flatMap(lambda row: row)

                    # Create a separate DataFrame for nested data
                    nested_df = spark.createDataFrame(nested_data,
schema=nested_data.first().asDict())

                    # Write nested DataFrame to its corresponding table
                    write_to_sink(nested_df, table_name)

                # Write the main DataFrame to its table
                write_to_sink(table_df, table_name)

        except Exception as e:
            # Log errors gracefully
            log_error(f"Error processing table {table_name}: {e}")

    else:
        print("DataFrame is empty")  # Handle empty DataFrame

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 10 Jan 2024 at 18:51, PRASHANT L <prashant...@gmail.com> wrote:

> Hi
> I have a use case where I need to process json payloads coming from Kafka
> using structured streaming , but thing is json can have different formats ,
> schema is not fixed
> and each json will have a @type tag so based on tag , json has to be
> parsed and loaded to table with tag name  , and if a json has nested sub
> tags , those tags shd go to different table
> so I need to process each json record individually , and determine
> destination tables what would be the best approach
>
>
>> *{*
>> *    "os": "andriod",*
>> *    "type": "mobile",*
>> *    "device": {*
>> *        "warrenty": "3 years",*
>> *        "replace": "yes"*
>> *    },*
>> *    "zones": [*
>> *        {*
>> *            "city": "Bangalore",*
>> *            "state": "KA",*
>> *            "pin": "577401"*
>> *        },*
>> *        {*
>> *            "city": "Mumbai",*
>> *            "state": "MH",*
>> *            "pin": "576003"*
>> *        }*
>> *    ],*
>> *    "@table": "product"**}*
>
>
> so for the above json , there are 3 tables created
> 1. Product (@type) THis is a parent table
> 2.  poduct_zones and product_devices , child table
>

Reply via email to