Re: Issue in Creating Temp_view in databricks and using spark.sql().

2024-01-31 Thread Mich Talebzadeh
I agree with what is stated. This is the gist of my understanding having
tested it.
When working with Spark Structured Streaming, each streaming query runs in
its own separate Spark session to ensure isolation and avoid conflicts
between different queries.
So here I have:

def process_data(self, df: F.DataFrame, batchId: int) -> None:
  if(len(df.take(1))) > 0:
df.select(col("timestamp"), col("value"), col("rowkey"), col("ID"),
col("CLUSTERED"), col("op_time")).show(1, False)
df.createOrReplaceTempView("tmp_view")
try:
   rows = *df.sparkSession.sq*l("SELECT COUNT(1) FROM
tmp_view").collect()[0][0]
   print(f"Number of rows: {rows}")
except Exception as e:
   logging.error(f"Error counting rows: {e}")
  else:
logging.warning("DataFrame is empty")

Here, df.sparkSession accesses the rows associated with the streaming
DataFrame 'df'

+---++++-+---+
|timestamp  |value   |rowkey  |ID
|CLUSTERED|op_time|
+---++++-+---+
|2024-01-31
20:31:24.152|25754740|df4d864d-517d-4f59-8f9e-bd1e7cd9678f|25754740|2575473.9|2024-01-31
20:31:30|
+---++++-+---+
only showing top 1 row

rows is 50

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 31 Jan 2024 at 13:30, Karthick Nk  wrote:

> Hi Team,
>
> I am using structered streaming in pyspark in azure Databricks, in that I
> am creating temp_view from dataframe
> (df.createOrReplaceTempView('temp_view')) for performing spark sql query
> transformation.
> In that I am facing the issue that temp_view not found, so that as a
> workaround i have created global temp_view to use.
> But same when i have tried to create without streaming, i am able to
> perform the temp_view.
>
>
> write_to_final_table =
>  
> (spark.readStream.format('delta').option('ignoreChanges',True).table(f"{delta_table_name}")).writeStream.queryName(f"{query_name}").format("org.elasticsearch.spark.sql").trigger(processingTime=f'1
> minutes').outputMode("append").foreachBatch(process_micro_batch).option("checkpointLocation",checkpointdirectory_path).option("mergeSchema",
> "true").option("failOnDataLoss", "false").start()
>
>
> def process_micro_batch(micro_batch_df, batchId) :
> micro_batch_df.createOrReplaceTempView("temp_view")
> df = spark.sql(f"select * from temp_view")
> return df
>
> Here, I am getting error, while reading data from temp_view that temp_view
> not found error.
>
>
> I need to perform or create temp_view (*Not global temp_view)based on the
> dataframe, and need to perform the spark sql transformation in structered
> streaming.
>
> I have few question in my hand?
> 1. is strucutered streaming and spark.sql will have different
> spark.context within same databricks notebook?
> 2. If i want to create temp_view based on the dataframe and need to
> perform the spark sql operation, how can i create the tempview (Not global
> tempview, Since global temp view will be available in the cluster level
> across all the notebook)?
>
> Thanks & Regards
>


Re: Issue in Creating Temp_view in databricks and using spark.sql().

2024-01-31 Thread Mich Talebzadeh
hm.

In your logic here

def process_micro_batch(micro_batch_df, batchId) :
micro_batch_df.createOrReplaceTempView("temp_view")
df = spark.sql(f"select * from temp_view")
return df

Is this function called and if so do you check if micro_batch_df contains
rows ->   if len(micro_batch_df.take(1)) > 0:

something like

# Modified process_data function to check for external trigger
def process_data(batch_df: F.DataFrame, batchId: int) -> None:
*if len(batch_df.take(1)) > 0:*
  # Check for external event trigger
  if listen_for_external_event():
# Assuming 'data' is a list of dictionaries obtained from the API
in each batch
api_data = get_api_data()
if api_data:
  dfAPI = spark_session.createDataFrame(api_data,
schema=data_schema)
  dfAPI = dfAPI \
 .withColumn("op_type", lit(udfs.op_type_api_udf())) \
 .withColumn("op_time", udfs.timestamp_udf(current_timestamp()))
  dfAPI.show(10, False)
else:
   logging.warning("Error getting API data.")
  else:
logging.info("No external trigger received.")
*else:*
*logging.warning("DataFrame is empty")*

# Streaming DataFrame Creation:
# construct a streaming dataframe that subscribes to topic rate for data
"""
This creates a streaming DataFrame by subscribing to a rate source.
It simulates a stream by generating data at a specified rate
(rowsPerSecond).
"""
streamingDataFrame = spark_session.readStream.format("rate") \
.option("rowsPerSecond", 100) \
.option("subscribe", "rate") \
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "latest") \
.load()

# Generate a unique query name by appending a timestamp
query_name = f"{appName}_{int(time.time())}"
logging.info(query_name)

# Main loop to continuously check for events
while True:
# Start the streaming query only if an external event is received
if listen_for_external_event():
query_name = f"{appName}_{int(time.time())}"
logging.info(query_name)
result_query = (
streamingDataFrame.writeStream
.outputMode('append')
.option("truncate", "false")
.foreachBatch(lambda df, batchId: process_data(df, batchId))
.trigger(processingTime=f'{processingTime} seconds')
.option('checkpointLocation', checkpoint_path)
.queryName(f"{query_name}")
.start()
)
break  # Exit the loop after starting the streaming query
else:
time.sleep(5)  # Sleep for a while before checking for the next
event

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 31 Jan 2024 at 13:30, Karthick Nk  wrote:

> Hi Team,
>
> I am using structered streaming in pyspark in azure Databricks, in that I
> am creating temp_view from dataframe
> (df.createOrReplaceTempView('temp_view')) for performing spark sql query
> transformation.
> In that I am facing the issue that temp_view not found, so that as a
> workaround i have created global temp_view to use.
> But same when i have tried to create without streaming, i am able to
> perform the temp_view.
>
>
> write_to_final_table =
>  
> (spark.readStream.format('delta').option('ignoreChanges',True).table(f"{delta_table_name}")).writeStream.queryName(f"{query_name}").format("org.elasticsearch.spark.sql").trigger(processingTime=f'1
> minutes').outputMode("append").foreachBatch(process_micro_batch).option("checkpointLocation",checkpointdirectory_path).option("mergeSchema",
> "true").option("failOnDataLoss", "false").start()
>
>
> def process_micro_batch(micro_batch_df, batchId) :
> micro_batch_df.createOrReplaceTempView("temp_view")
> df = spark.sql(f"select * from temp_view")
> return df
>
> Here, I am getting error, while reading data from temp_view that temp_view
> not found error.
>
>
> I need to perform or create temp_view (*Not global temp_view)based on the
> dataframe, and need to perform the spark sql transformation in structered
> streaming.
>
> I have few question in my hand?
> 1. is strucutered streaming and spark.sql will have different
> spark.context within same databricks notebook?
> 2. If i want to create temp_view based on the dataframe and need to
> perform the spark sql operation, how can i create the tempview (Not global
> tempview, Since global temp view will be available in the cluster 

Re: Issue in Creating Temp_view in databricks and using spark.sql().

2024-01-31 Thread Jungtaek Lim
Hi,

Streaming query clones the spark session - when you create a temp view from
DataFrame, the temp view is created under the cloned session. You will need
to use micro_batch_df.sparkSession to access the cloned session.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Wed, Jan 31, 2024 at 3:29 PM Karthick Nk  wrote:

> Hi Team,
>
> I am using structered streaming in pyspark in azure Databricks, in that I
> am creating temp_view from dataframe
> (df.createOrReplaceTempView('temp_view')) for performing spark sql query
> transformation.
> In that I am facing the issue that temp_view not found, so that as a
> workaround i have created global temp_view to use.
> But same when i have tried to create without streaming, i am able to
> perform the temp_view.
>
>
> write_to_final_table =
>  
> (spark.readStream.format('delta').option('ignoreChanges',True).table(f"{delta_table_name}")).writeStream.queryName(f"{query_name}").format("org.elasticsearch.spark.sql").trigger(processingTime=f'1
> minutes').outputMode("append").foreachBatch(process_micro_batch).option("checkpointLocation",checkpointdirectory_path).option("mergeSchema",
> "true").option("failOnDataLoss", "false").start()
>
>
> def process_micro_batch(micro_batch_df, batchId) :
> micro_batch_df.createOrReplaceTempView("temp_view")
> df = spark.sql(f"select * from temp_view")
> return df
>
> Here, I am getting error, while reading data from temp_view that temp_view
> not found error.
>
>
> I need to perform or create temp_view (*Not global temp_view)based on the
> dataframe, and need to perform the spark sql transformation in structered
> streaming.
>
> I have few question in my hand?
> 1. is strucutered streaming and spark.sql will have different
> spark.context within same databricks notebook?
> 2. If i want to create temp_view based on the dataframe and need to
> perform the spark sql operation, how can i create the tempview (Not global
> tempview, Since global temp view will be available in the cluster level
> across all the notebook)?
>
> Thanks & Regards
>


Issue in Creating Temp_view in databricks and using spark.sql().

2024-01-30 Thread Karthick Nk
Hi Team,

I am using structered streaming in pyspark in azure Databricks, in that I
am creating temp_view from dataframe
(df.createOrReplaceTempView('temp_view')) for performing spark sql query
transformation.
In that I am facing the issue that temp_view not found, so that as a
workaround i have created global temp_view to use.
But same when i have tried to create without streaming, i am able to
perform the temp_view.


write_to_final_table =
 
(spark.readStream.format('delta').option('ignoreChanges',True).table(f"{delta_table_name}")).writeStream.queryName(f"{query_name}").format("org.elasticsearch.spark.sql").trigger(processingTime=f'1
minutes').outputMode("append").foreachBatch(process_micro_batch).option("checkpointLocation",checkpointdirectory_path).option("mergeSchema",
"true").option("failOnDataLoss", "false").start()


def process_micro_batch(micro_batch_df, batchId) :
micro_batch_df.createOrReplaceTempView("temp_view")
df = spark.sql(f"select * from temp_view")
return df

Here, I am getting error, while reading data from temp_view that temp_view
not found error.


I need to perform or create temp_view (*Not global temp_view)based on the
dataframe, and need to perform the spark sql transformation in structered
streaming.

I have few question in my hand?
1. is strucutered streaming and spark.sql will have different spark.context
within same databricks notebook?
2. If i want to create temp_view based on the dataframe and need to perform
the spark sql operation, how can i create the tempview (Not global
tempview, Since global temp view will be available in the cluster level
across all the notebook)?

Thanks & Regards