Re: Bug in How to Monitor Streaming Queries in PySpark

2024-03-11 Thread 刘唯
Oh I see why the confusion.

microbatch_data = event.progress

means that microbatch_data is a StreamingQueryProgress instance, it's not a
dictionary, so you should use ` microbatch_data.processedRowsPerSecond`,
instead of the `get` method which is used for dictionaries.

But weirdly, for query.lastProgress and query.recentProgress, they should
return StreamingQueryProgress  but instead they returned a dict. So the
`get` method works there.

I think PySpark should improve on this part.

Mich Talebzadeh  于2024年3月11日周一 05:51写道:

> Hi,
>
> Thank you for your advice
>
> This is the amended code
>
>def onQueryProgress(self, event):
> print("onQueryProgress")
> # Access micro-batch data
> microbatch_data = event.progress
> #print("microbatch_data received")  # Check if data is received
> #print(microbatch_data)
> #processed_rows_per_second =
> microbatch_data.get("processed_rows_per_second")
> processed_rows_per_second =
> microbatch_data.get("processedRowsPerSecond")
> print("CPC", processed_rows_per_second)
> if processed_rows_per_second is not None:  # Check if value exists
>print("ocessed_rows_per_second retrieved")
>print(f"Processed rows per second: {processed_rows_per_second}")
> else:
>print("processed_rows_per_second not retrieved!")
>
> This is the output
>
> onQueryStarted
> 'None' [c1a910e6-41bb-493f-b15b-7863d07ff3fe] got started!
> SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
> SLF4J: Defaulting to no-operation MDCAdapter implementation.
> SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for
> further details.
> ---
> Batch: 0
> ---
> +---+-+---+---+
> |key|doubled_value|op_type|op_time|
> +---+-+---+---+
> +---+-+---+---+
>
> onQueryProgress
> ---
> Batch: 1
> ---
> ++-+---++
> | key|doubled_value|op_type| op_time|
> ++-+---++
> |a960f663-d13a-49c...|0|  1|2024-03-11 12:17:...|
> ++-+---++
>
> onQueryProgress
> ---
> Batch: 2
> ---
> ++-+---++
> | key|doubled_value|op_type| op_time|
> ++-+---++
> |a960f663-d13a-49c...|2|  1|2024-03-11 12:17:...|
> ++-+---++
>
> I am afraid it is not working. Not even printing anything
>
> Cheers
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my kno
> wledge but of course cannot be guaranteed . It is essential to note that,
> as with any advice, quote "one test result is worth one-thousand expert op
> inions (Werner  Von Braun
> )".
>
>
> On Mon, 11 Mar 2024 at 05:07, 刘唯  wrote:
>
>> *now -> not
>>
>> 刘唯  于2024年3月10日周日 22:04写道:
>>
>>> Have you tried using microbatch_data.get("processedRowsPerSecond")?
>>> Camel case now snake case
>>>
>>> Mich Talebzadeh  于2024年3月10日周日 11:46写道:
>>>

 There is a paper from Databricks on this subject


 https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html

 But having tested it, there seems to be a bug there that I reported to
 Databricks forum as well (in answer to a user question)

 I have come to a conclusion that this is a bug. In general there is a b
 ug in obtaining individual values from the dictionary. For example, a b
 ug in the way Spark Streaming is populating the processe
 d_rows_per_second key within the microbatch_data -> microbatch_data =
 event.progres dictionary or any other key. I have explored various deb
 ugging steps, and even though the key seems to exist, the value might n
 ot be getting set. Note that the dictionary itself prints the elements
 correctly. This is with regard to method onQueryProgress(self, event)
 in class MyListener(StreamingQueryListener):

 For example with print(microbatch_data), you get all printed as below

 onQueryProgress
 microbatch_data received
 {
 "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
 "runId" : 

Data ingestion into elastic failing using pyspark

2024-03-11 Thread Karthick Nk
Hi @all,

I am using pyspark program to write the data into elastic index by using
upsert operation (sample code snippet below).

def writeDataToES(final_df):
write_options = {
"es.nodes":  elastic_host,
"es.net.ssl": "false",
"es.nodes.wan.only": "true",
"es.net.http.auth.user": elastic_user_name,
"es.net.http.auth.pass": elastic_password,
"es.port": elastic_port,
"es.net.ssl": "true",
'es.spark.dataframe.write.null': "true",
"es.mapping.id" : mapping_id,
"es.write.operation": "upsert"
}
final_df.write.format(
"org.elasticsearch.spark.sql").options(**write_options).mode("append").save(
f"{index_name}")


while writing data from delta table to elastic index, i am getting error
for few records(error message below)

*Py4JJavaError: An error occurred while calling o1305.save.*
*: org.apache.spark.SparkException: Job aborted due to stage failure: Task
4 in stage 524.0 failed 4 times, most recent failure: Lost task 4.3 in
stage 524.0 (TID 12805) (192.168.128.16 executor 1):
org.elasticsearch.hadoop.EsHadoopException: Could not write all entries for
bulk operation [1/1]. Error sample (first [5] error messages):*
* org.elasticsearch.hadoop.rest.EsHadoopRemoteException:
illegal_argument_exception: Illegal group reference: group index is missing*

Could you guide me on it, am I missing anythings,

If you require more additional details, please let me know.

Thanks


Re: Bugs with joins and SQL in Structured Streaming

2024-03-11 Thread Andrzej Zera
Hi,

Do you think there is any chance for this issue to get resolved? Should I
create another bug report? As mentioned in my message, there is one open
already: https://issues.apache.org/jira/browse/SPARK-45637 but it covers
only one of the problems.

Andrzej

wt., 27 lut 2024 o 09:58 Andrzej Zera  napisał(a):

> Hi,
>
> Yes, I tested all of them on spark 3.5.
>
> Regards,
> Andrzej
>
>
> pon., 26 lut 2024 o 23:24 Mich Talebzadeh 
> napisał(a):
>
>> Hi,
>>
>> These are all on spark 3.5, correct?
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Mon, 26 Feb 2024 at 22:18, Andrzej Zera  wrote:
>>
>>> Hey all,
>>>
>>> I've been using Structured Streaming in production for almost a year
>>> already and I want to share the bugs I found in this time. I created a test
>>> for each of the issues and put them all here:
>>> https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala
>>>
>>> I split the issues into three groups: outer joins on event time,
>>> interval joins and Spark SQL.
>>>
>>> Issues related to outer joins:
>>>
>>>- When joining three or more input streams on event time, if two or
>>>more streams don't contain an event for a join key (which is event time),
>>>no row will be output even if other streams contain an event for this 
>>> join
>>>key. Tests that check for this:
>>>
>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86
>>>and
>>>
>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L169
>>>- When joining aggregated stream with raw events with a stream with
>>>already aggregated events (aggregation made outside of Spark), then no 
>>> row
>>>will be output if that second stream don't contain a corresponding event.
>>>Test that checks for this:
>>>
>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L266
>>>- When joining two aggregated streams (aggregated in Spark), no
>>>result is produced. Test that checks for this:
>>>
>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L341.
>>>I've already reported this one here:
>>>https://issues.apache.org/jira/browse/SPARK-45637 but it hasn't been
>>>handled yet.
>>>
>>> Issues related to interval joins:
>>>
>>>- When joining three streams (A, B, C) using interval join on event
>>>time, in the way that B.eventTime is conditioned on A.eventTime and
>>>C.eventTime is also conditioned on A.eventTime, and then doing window
>>>aggregation based on A's event time, the result is output only after
>>>watermark crosses the window end + interval(A, B) + interval (A, C).
>>>However, I'd expect results to be output faster, i.e. when the watermark
>>>crosses window end + MAX(interval(A, B) + interval (A, C)). If our case 
>>> is
>>>that event B can happen 3 minutes after event A and event C can happen 5
>>>minutes after A, there is no point to suspend reporting output for 8
>>>minutes (3+5) after the end of the window if we know that no more event 
>>> can
>>>be matched after 5 min from the window end (assuming window end is based 
>>> on
>>>A's event time). Test that checks for this:
>>>
>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/IntervalJoinTest.scala#L32
>>>
>>> SQL issues:
>>>
>>>- WITH clause (in contrast to subquery) seems to create a static
>>>DataFrame that can't be used in streaming joins. Test that checks for 
>>> this:
>>>
>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L31
>>>- Two subqueries, each aggregating data using window() functio,
>>>breaks the output schema. Test that checks for this:
>>>
>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L122
>>>
>>> I'm a beginner with Scala (I'm using Structured Streaming with PySpark)
>>> so won't be able to provide fixes. But I hope the test cases I provided can
>>> be of some 

Re: Bug in How to Monitor Streaming Queries in PySpark

2024-03-11 Thread Mich Talebzadeh
Hi,

Thank you for your advice

This is the amended code

   def onQueryProgress(self, event):
print("onQueryProgress")
# Access micro-batch data
microbatch_data = event.progress
#print("microbatch_data received")  # Check if data is received
#print(microbatch_data)
#processed_rows_per_second =
microbatch_data.get("processed_rows_per_second")
processed_rows_per_second =
microbatch_data.get("processedRowsPerSecond")
print("CPC", processed_rows_per_second)
if processed_rows_per_second is not None:  # Check if value exists
   print("ocessed_rows_per_second retrieved")
   print(f"Processed rows per second: {processed_rows_per_second}")
else:
   print("processed_rows_per_second not retrieved!")

This is the output

onQueryStarted
'None' [c1a910e6-41bb-493f-b15b-7863d07ff3fe] got started!
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further
details.
---
Batch: 0
---
+---+-+---+---+
|key|doubled_value|op_type|op_time|
+---+-+---+---+
+---+-+---+---+

onQueryProgress
---
Batch: 1
---
++-+---++
| key|doubled_value|op_type| op_time|
++-+---++
|a960f663-d13a-49c...|0|  1|2024-03-11 12:17:...|
++-+---++

onQueryProgress
---
Batch: 2
---
++-+---++
| key|doubled_value|op_type| op_time|
++-+---++
|a960f663-d13a-49c...|2|  1|2024-03-11 12:17:...|
++-+---++

I am afraid it is not working. Not even printing anything

Cheers

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 11 Mar 2024 at 05:07, 刘唯  wrote:

> *now -> not
>
> 刘唯  于2024年3月10日周日 22:04写道:
>
>> Have you tried using microbatch_data.get("processedRowsPerSecond")?
>> Camel case now snake case
>>
>> Mich Talebzadeh  于2024年3月10日周日 11:46写道:
>>
>>>
>>> There is a paper from Databricks on this subject
>>>
>>>
>>> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html
>>>
>>> But having tested it, there seems to be a bug there that I reported to
>>> Databricks forum as well (in answer to a user question)
>>>
>>> I have come to a conclusion that this is a bug. In general there is a
>>> bug in obtaining individual values from the dictionary. For example, a bug
>>> in the way Spark Streaming is populating the processed_rows_per_second key
>>> within the microbatch_data -> microbatch_data = event.progres dictionary or
>>> any other key. I have explored various debugging steps, and even though the
>>> key seems to exist, the value might not be getting set. Note that the
>>> dictionary itself prints the elements correctly. This is with regard to
>>> method onQueryProgress(self, event) in class
>>> MyListener(StreamingQueryListener):
>>>
>>> For example with print(microbatch_data), you get all printed as below
>>>
>>> onQueryProgress
>>> microbatch_data received
>>> {
>>> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
>>> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
>>> "name" : null,
>>> "timestamp" : "2024-03-10T09:21:27.233Z",
>>> "batchId" : 21,
>>> "numInputRows" : 1,
>>> "inputRowsPerSecond" : 100.0,
>>> "processedRowsPerSecond" : 5.347593582887701,
>>> "durationMs" : {
>>> "addBatch" : 37,
>>> "commitOffsets" : 41,
>>> "getBatch" : 0,
>>> "latestOffset" : 0,
>>> "queryPlanning" : 5,
>>> "triggerExecution" : 187,
>>> "walCommit" : 104
>>> },
>>> "stateOperators" : [ ],
>>> "sources" : [ {
>>> "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
>>> numPartitions=default",
>>> "startOffset" : 20,
>>> "endOffset" : 21,
>>> "latestOffset" : 21,
>>> "numInputRows" : 1,
>>> "inputRowsPerSecond" : 100.0,
>>> "processedRowsPerSecond" :