Hi, Let us visit the approach as some fellow members correctly highlighted the use case for spark structured streaming and two key concepts that I will mention
- foreach: A method for applying custom write logic to each individual row in a streaming DataFrame or Dataset. - foreachBatch: A method for applying custom write logic to entire micro-batches of data, providing more flexibility for complex operations. - sendToSink (my chosen name here, Custom Logic ) : A user-defined function that encapsulates the logic for writing a micro-batch to a sink (In my case Google BigQuery DW) Let us create a pseudo code (in Python for sendToSink function used in f oreachBatch(SendToBSink) def sendToSink(df, batchId): if len(df.take(1)) > 0: # Check for empty DataFrame try: # Extract table names from the "@table" column table_names = df.select("@table").rdd.flatMap(lambda row: row).collect() # Iterate through each table name for table_name in table_names: # Filter the DataFrame for rows belonging to the current table table_df = df.filter(col("@table") == table_name) # Handle nested structures for specific tables if table_name in ["product_zones", "product_devices"]: # Extract nested data (e.g., "zones" or "device" columns) nested_data = table_df.select("zones", "device").rdd.flatMap(lambda row: row) # Create a separate DataFrame for nested data nested_df = spark.createDataFrame(nested_data, schema=nested_data.first().asDict()) # Write nested DataFrame to its corresponding table write_to_sink(nested_df, table_name) # Write the main DataFrame to its table write_to_sink(table_df, table_name) except Exception as e: # Log errors gracefully log_error(f"Error processing table {table_name}: {e}") else: print("DataFrame is empty") # Handle empty DataFrame HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 10 Jan 2024 at 18:51, PRASHANT L <prashant...@gmail.com> wrote: > Hi > I have a use case where I need to process json payloads coming from Kafka > using structured streaming , but thing is json can have different formats , > schema is not fixed > and each json will have a @type tag so based on tag , json has to be > parsed and loaded to table with tag name , and if a json has nested sub > tags , those tags shd go to different table > so I need to process each json record individually , and determine > destination tables what would be the best approach > > >> *{* >> * "os": "andriod",* >> * "type": "mobile",* >> * "device": {* >> * "warrenty": "3 years",* >> * "replace": "yes"* >> * },* >> * "zones": [* >> * {* >> * "city": "Bangalore",* >> * "state": "KA",* >> * "pin": "577401"* >> * },* >> * {* >> * "city": "Mumbai",* >> * "state": "MH",* >> * "pin": "576003"* >> * }* >> * ],* >> * "@table": "product"**}* > > > so for the above json , there are 3 tables created > 1. Product (@type) THis is a parent table > 2. poduct_zones and product_devices , child table >