hm. In your logic here
def process_micro_batch(micro_batch_df, batchId) : micro_batch_df.createOrReplaceTempView("temp_view") df = spark.sql(f"select * from temp_view") return df Is this function called and if so do you check if micro_batch_df contains rows -> if len(micro_batch_df.take(1)) > 0: something like # Modified process_data function to check for external trigger def process_data(batch_df: F.DataFrame, batchId: int) -> None: *if len(batch_df.take(1)) > 0:* # Check for external event trigger if listen_for_external_event(): # Assuming 'data' is a list of dictionaries obtained from the API in each batch api_data = get_api_data() if api_data: dfAPI = spark_session.createDataFrame(api_data, schema=data_schema) dfAPI = dfAPI \ .withColumn("op_type", lit(udfs.op_type_api_udf())) \ .withColumn("op_time", udfs.timestamp_udf(current_timestamp())) dfAPI.show(10, False) else: logging.warning("Error getting API data.") else: logging.info("No external trigger received.") *else:* * logging.warning("DataFrame is empty")* # Streaming DataFrame Creation: # construct a streaming dataframe that subscribes to topic rate for data """ This creates a streaming DataFrame by subscribing to a rate source. It simulates a stream by generating data at a specified rate (rowsPerSecond). """ streamingDataFrame = spark_session.readStream.format("rate") \ .option("rowsPerSecond", 100) \ .option("subscribe", "rate") \ .option("failOnDataLoss", "false") \ .option("includeHeaders", "true") \ .option("startingOffsets", "latest") \ .load() # Generate a unique query name by appending a timestamp query_name = f"{appName}_{int(time.time())}" logging.info(query_name) # Main loop to continuously check for events while True: # Start the streaming query only if an external event is received if listen_for_external_event(): query_name = f"{appName}_{int(time.time())}" logging.info(query_name) result_query = ( streamingDataFrame.writeStream .outputMode('append') .option("truncate", "false") .foreachBatch(lambda df, batchId: process_data(df, batchId)) .trigger(processingTime=f'{processingTime} seconds') .option('checkpointLocation', checkpoint_path) .queryName(f"{query_name}") .start() ) break # Exit the loop after starting the streaming query else: time.sleep(5) # Sleep for a while before checking for the next event HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 31 Jan 2024 at 13:30, Karthick Nk <kcekarth...@gmail.com> wrote: > Hi Team, > > I am using structered streaming in pyspark in azure Databricks, in that I > am creating temp_view from dataframe > (df.createOrReplaceTempView('temp_view')) for performing spark sql query > transformation. > In that I am facing the issue that temp_view not found, so that as a > workaround i have created global temp_view to use. > But same when i have tried to create without streaming, i am able to > perform the temp_view. > > > write_to_final_table = > > (spark.readStream.format('delta').option('ignoreChanges',True).table(f"{delta_table_name}")).writeStream.queryName(f"{query_name}").format("org.elasticsearch.spark.sql").trigger(processingTime=f'1 > minutes').outputMode("append").foreachBatch(process_micro_batch).option("checkpointLocation",checkpointdirectory_path).option("mergeSchema", > "true").option("failOnDataLoss", "false").start() > > > def process_micro_batch(micro_batch_df, batchId) : > micro_batch_df.createOrReplaceTempView("temp_view") > df = spark.sql(f"select * from temp_view") > return df > > Here, I am getting error, while reading data from temp_view that temp_view > not found error. > > > I need to perform or create temp_view (*Not global temp_view)based on the > dataframe, and need to perform the spark sql transformation in structered > streaming. > > I have few question in my hand? > 1. is strucutered streaming and spark.sql will have different > spark.context within same databricks notebook? > 2. If i want to create temp_view based on the dataframe and need to > perform the spark sql operation, how can i create the tempview (Not global > tempview, Since global temp view will be available in the cluster level > across all the notebook)? > > Thanks & Regards >