hm.

In your logic here

def process_micro_batch(micro_batch_df, batchId) :
    micro_batch_df.createOrReplaceTempView("temp_view")
    df = spark.sql(f"select * from temp_view")
    return df

Is this function called and if so do you check if micro_batch_df contains
rows ->   if len(micro_batch_df.take(1)) > 0:

something like

# Modified process_data function to check for external trigger
def process_data(batch_df: F.DataFrame, batchId: int) -> None:
    *if len(batch_df.take(1)) > 0:*
      # Check for external event trigger
      if listen_for_external_event():
        # Assuming 'data' is a list of dictionaries obtained from the API
in each batch
        api_data = get_api_data()
        if api_data:
          dfAPI = spark_session.createDataFrame(api_data,
schema=data_schema)
          dfAPI = dfAPI \
             .withColumn("op_type", lit(udfs.op_type_api_udf())) \
             .withColumn("op_time", udfs.timestamp_udf(current_timestamp()))
          dfAPI.show(10, False)
        else:
           logging.warning("Error getting API data.")
      else:
        logging.info("No external trigger received.")
    *else:*
*        logging.warning("DataFrame is empty")*

# Streaming DataFrame Creation:
# construct a streaming dataframe that subscribes to topic rate for data
"""
This creates a streaming DataFrame by subscribing to a rate source.
It simulates a stream by generating data at a specified rate
(rowsPerSecond).
"""
streamingDataFrame = spark_session.readStream.format("rate") \
    .option("rowsPerSecond", 100) \
    .option("subscribe", "rate") \
    .option("failOnDataLoss", "false") \
    .option("includeHeaders", "true") \
    .option("startingOffsets", "latest") \
    .load()

# Generate a unique query name by appending a timestamp
query_name = f"{appName}_{int(time.time())}"
logging.info(query_name)

    # Main loop to continuously check for events
while True:
    # Start the streaming query only if an external event is received
    if listen_for_external_event():
        query_name = f"{appName}_{int(time.time())}"
        logging.info(query_name)
        result_query = (
            streamingDataFrame.writeStream
                .outputMode('append')
                .option("truncate", "false")
                .foreachBatch(lambda df, batchId: process_data(df, batchId))
                .trigger(processingTime=f'{processingTime} seconds')
                .option('checkpointLocation', checkpoint_path)
                .queryName(f"{query_name}")
                .start()
        )
        break  # Exit the loop after starting the streaming query
    else:
        time.sleep(5)  # Sleep for a while before checking for the next
event

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 31 Jan 2024 at 13:30, Karthick Nk <kcekarth...@gmail.com> wrote:

> Hi Team,
>
> I am using structered streaming in pyspark in azure Databricks, in that I
> am creating temp_view from dataframe
> (df.createOrReplaceTempView('temp_view')) for performing spark sql query
> transformation.
> In that I am facing the issue that temp_view not found, so that as a
> workaround i have created global temp_view to use.
> But same when i have tried to create without streaming, i am able to
> perform the temp_view.
>
>
> write_to_final_table =
>  
> (spark.readStream.format('delta').option('ignoreChanges',True).table(f"{delta_table_name}")).writeStream.queryName(f"{query_name}").format("org.elasticsearch.spark.sql").trigger(processingTime=f'1
> minutes').outputMode("append").foreachBatch(process_micro_batch).option("checkpointLocation",checkpointdirectory_path).option("mergeSchema",
> "true").option("failOnDataLoss", "false").start()
>
>
> def process_micro_batch(micro_batch_df, batchId) :
>     micro_batch_df.createOrReplaceTempView("temp_view")
>     df = spark.sql(f"select * from temp_view")
>     return df
>
> Here, I am getting error, while reading data from temp_view that temp_view
> not found error.
>
>
> I need to perform or create temp_view (*Not global temp_view)based on the
> dataframe, and need to perform the spark sql transformation in structered
> streaming.
>
> I have few question in my hand?
> 1. is strucutered streaming and spark.sql will have different
> spark.context within same databricks notebook?
> 2. If i want to create temp_view based on the dataframe and need to
> perform the spark sql operation, how can i create the tempview (Not global
> tempview, Since global temp view will be available in the cluster level
> across all the notebook)?
>
> Thanks & Regards
>

Reply via email to