Re: Spark StructuredStreaming - watermark not working as expected

2023-03-10 Thread karan alang
Hi Mich -
Here is the output of the ldf.printSchema() & ldf.show() commands.

ldf.printSchema()

root
 |-- applianceName: string (nullable = true)
 |-- timeslot: long (nullable = true)
 |-- customer: string (nullable = true)
 |-- window: struct (nullable = false)
 ||-- start: timestamp (nullable = true)
 ||-- end: timestamp (nullable = true)
 |-- sentOctets: long (nullable = true)
 |-- recvdOctets: long (nullable = true)


 ldf.show() :

 
+--+---++--++--+--+---+
|applianceName |timeslot|customer|window
 |sentOctets|recvdOctets|
+--+---++--++--+--+---+
|abc1  |2797514|cust1 |{2023-03-11 04:15:00, 2023-03-11
04:30:00}|21459264  |32211859   |
|pqrq  |2797513|cust1 |{2023-03-11 04:15:00, 2023-03-11
04:30:00}|17775527  |31331093   |
|xyz|2797514|cust1 |{2023-03-11 04:15:00,
2023-03-11 04:30:00}|12808015  |24191707   |
+--+---++--++--+--+---+

Also, any comment on the outputMode ? I've set it to 'update', since I'm
using aggregation.

thanks!

On Fri, Mar 10, 2023 at 10:55 AM Mich Talebzadeh 
wrote:

>
> Just looking at the code
>
>
> in here
>
>
> ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>  
> window(col("ts"), "15 minutes")) \
> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
> .fillna(0)
>
> What does ldf.printSchema() returns
>
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 10 Mar 2023 at 07:16, karan alang  wrote:
>
>>
>> Hello All -
>>
>> I've a structured Streaming job which has a trigger of 10 minutes, and
>> I'm using watermark to account for late data coming in. However, the
>> watermark is not working - and instead of a single record with total
>> aggregated value, I see 2 records.
>>
>> Here is the code :
>>
>> ```
>>
>> 1) StructuredStreaming - Reading from Kafka every 10 mins
>>
>>
>> df_stream = self.spark.readStream.format('kafka') \
>> .option("kafka.security.protocol", "SSL") \
>> .option("kafka.ssl.truststore.location", 
>> self.ssl_truststore_location) \
>> .option("kafka.ssl.truststore.password", 
>> self.ssl_truststore_password) \
>> .option("kafka.ssl.keystore.location", 
>> self.ssl_keystore_location_bandwidth_intermediate) \
>> .option("kafka.ssl.keystore.password", 
>> self.ssl_keystore_password_bandwidth_intermediate) \
>> .option("kafka.bootstrap.servers", self.kafkaBrokers) \
>> .option("subscribe", topic) \
>> .option("startingOffsets", "latest") \
>> .option("failOnDataLoss", "false") \
>> .option("kafka.metadata.max.age.ms", "1000") \
>> .option("kafka.ssl.keystore.type", "PKCS12") \
>> .option("kafka.ssl.truststore.type", "PKCS12") \
>> .load()
>>
>> 2. calling foreachBatch(self.process)
>> # note - outputMode is set to "update" (tried setting outputMode = 
>> append as well)
>>
>> # 03/09 ::: outputMode - update instead of append
>> query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", 
>> "topic").writeStream \
>> .outputMode("update") \
>> .trigger(processingTime='10 minutes') \
>> .option("truncate", "false") \
>> .option("checkpointLocation", self.checkpoint) \
>> .foreachBatch(self.process) \
>> .start()
>>
>>
>> self.process - where i do the bulk of the  processing, which calls the 
>> function  'aggIntfLogs'
>>
>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing  groupBy 
>> to calculate the sum of sentOctets & recvdOctets
>>
>>
>> def aggIntfLogs(ldf):
>> if ldf and ldf.count() > 0:
>>
>> ldf = ldf.select('applianceName', 'timeslot', 'sentOctets', 
>> 'recvdOctets','ts', 'customer') \
>> .withColumn('sentOctets', 
>> ldf["sentOctets"].cast(LongType())) \
>> 

How to allocate vcores to driver (client mode)

2023-03-10 Thread sam smith
Hi,

I am launching through code (client mode) a Spark program to run in Hadoop.
Whenever I check the executors tab of Spark UI I always get 0 as the number
of vcores for the driver. I tried to change that using *spark.driver.cores*,
or also *spark.yarn.am.cores* in the SparkSession configuration but in
vain. I also tried to set those parameters in spark-defaults but, again,
with no success.
To note that in the environment tab, the right config is displayed.

Could this be the reason for a *CollectAsList *to freeze the execution (not
having enough CPU)?


Re: Spark StructuredStreaming - watermark not working as expected

2023-03-10 Thread Mich Talebzadeh
Just looking at the code


in here


ldf = ldf.groupBy("applianceName", "timeslot", "customer",

window(col("ts"), "15 minutes")) \
.agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
.withColumnRenamed('sum(sentOctets)', 'sentOctets') \
.withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
.fillna(0)

What does ldf.printSchema() returns


HTH


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 10 Mar 2023 at 07:16, karan alang  wrote:

>
> Hello All -
>
> I've a structured Streaming job which has a trigger of 10 minutes, and I'm
> using watermark to account for late data coming in. However, the watermark
> is not working - and instead of a single record with total aggregated
> value, I see 2 records.
>
> Here is the code :
>
> ```
>
> 1) StructuredStreaming - Reading from Kafka every 10 mins
>
>
> df_stream = self.spark.readStream.format('kafka') \
> .option("kafka.security.protocol", "SSL") \
> .option("kafka.ssl.truststore.location", 
> self.ssl_truststore_location) \
> .option("kafka.ssl.truststore.password", 
> self.ssl_truststore_password) \
> .option("kafka.ssl.keystore.location", 
> self.ssl_keystore_location_bandwidth_intermediate) \
> .option("kafka.ssl.keystore.password", 
> self.ssl_keystore_password_bandwidth_intermediate) \
> .option("kafka.bootstrap.servers", self.kafkaBrokers) \
> .option("subscribe", topic) \
> .option("startingOffsets", "latest") \
> .option("failOnDataLoss", "false") \
> .option("kafka.metadata.max.age.ms", "1000") \
> .option("kafka.ssl.keystore.type", "PKCS12") \
> .option("kafka.ssl.truststore.type", "PKCS12") \
> .load()
>
> 2. calling foreachBatch(self.process)
> # note - outputMode is set to "update" (tried setting outputMode = 
> append as well)
>
> # 03/09 ::: outputMode - update instead of append
> query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", 
> "topic").writeStream \
> .outputMode("update") \
> .trigger(processingTime='10 minutes') \
> .option("truncate", "false") \
> .option("checkpointLocation", self.checkpoint) \
> .foreachBatch(self.process) \
> .start()
>
>
> self.process - where i do the bulk of the  processing, which calls the 
> function  'aggIntfLogs'
>
> In function aggIntfLogs - i'm using watermark of 15 mins, and doing  groupBy 
> to calculate the sum of sentOctets & recvdOctets
>
>
> def aggIntfLogs(ldf):
> if ldf and ldf.count() > 0:
>
> ldf = ldf.select('applianceName', 'timeslot', 'sentOctets', 
> 'recvdOctets','ts', 'customer') \
> .withColumn('sentOctets', 
> ldf["sentOctets"].cast(LongType())) \
> .withColumn('recvdOctets', 
> ldf["recvdOctets"].cast(LongType())) \
> .withWatermark("ts", "15 minutes")
>
> ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>  
> window(col("ts"), "15 minutes")) \
> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
> .fillna(0)
> return ldf
> return ldf
>
>
> Dataframe 'ldf' returned from the function aggIntfLogs - is written 
> to Kafka topic
>
> ```
>
> I was expecting that using the watermark will account for late coming data
> .. i.e. the sentOctets & recvdOctets are calculated for the consolidated
> data
> (including late-coming data, since the late coming data comes within 15
> mins), however, I'm seeing 2 records for some of the data (i.e. key -
> applianceName/timeslot/customer) i.e. the aggregated data is calculated
> individually for the records and I see 2 records instead of single record
> accounting for late coming data within watermark.
>
> What needs to be done to fix this & make this work as desired?
>
> tia!
>
>
> Here is the Stackoverflow link as well -
>
>
> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected
>
>
>
>


Re: org.apache.spark.shuffle.FetchFailedException in dataproc

2023-03-10 Thread Mich Talebzadeh
for your dataproc what type of machines are you using for example
n2-standard-4 with 4vCPU and 16GB or something else? how many nodes and if
autoscaling turned on.

most likely executor memory limit?


HTH


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 10 Mar 2023 at 15:35, Gary Liu  wrote:

> Hi ,
>
> I have a job in GCP dataproc server spark session (spark 3.3.2), it is a
> job involving multiple joinings, as well as a complex UDF. I always got the
> below FetchFailedException, but the job can be done and the results look
> right. Neither of 2 input data is very big (one is 6.5M rows*11 columns,
> ~150M in orc format and 17.7M rows*11 columns, ~400M in orc format). It ran
> very smoothly on and on-premise spark environment though.
>
> According to Google's document (
> https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures),
> it has 3 solutions:
> 1. Using EFM mode
> 2. Increase executor memory
> 3, decrease the number of job partitions.
>
> 1. I started the session from a vertex notebook, so I don't know how to
> use EFM mode.
> 2. I increased executor memory from the default 12GB to 25GB, and the
> number of cores from 4 to 8, but it did not solve the problem.
> 3. Wonder how to do this? repartition the input dataset to have less
> partitions? I used df.rdd.getNumPartitions() to check the input data
> partitions, they have 9 and 17 partitions respectively, should I decrease
> them further? I also read a post on StackOverflow (
> https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se),
> saying increasing partitions may help.Which one makes more sense? I
> repartitioned the input data to 20 and 30 partitions, but still no luck.
>
> Any suggestions?
>
> 23/03/10 14:32:19 WARN TaskSetManager: Lost task 58.1 in stage 27.0 (TID 
> 3783) (10.1.0.116 executor 33): FetchFailed(BlockManagerId(72, 10.1.15.199, 
> 36791, None), shuffleId=24, mapIndex=77, mapId=3457, reduceId=58, message=
> org.apache.spark.shuffle.FetchFailedException
>   at 
> org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1180)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:918)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>   at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
>   at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
>   at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
>   at 
> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
>   at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
>   at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>   at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>   at 

org.apache.spark.shuffle.FetchFailedException in dataproc

2023-03-10 Thread Gary Liu
Hi ,

I have a job in GCP dataproc server spark session (spark 3.3.2), it is a
job involving multiple joinings, as well as a complex UDF. I always got the
below FetchFailedException, but the job can be done and the results look
right. Neither of 2 input data is very big (one is 6.5M rows*11 columns,
~150M in orc format and 17.7M rows*11 columns, ~400M in orc format). It ran
very smoothly on and on-premise spark environment though.

According to Google's document (
https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures),
it has 3 solutions:
1. Using EFM mode
2. Increase executor memory
3, decrease the number of job partitions.

1. I started the session from a vertex notebook, so I don't know how to use
EFM mode.
2. I increased executor memory from the default 12GB to 25GB, and the
number of cores from 4 to 8, but it did not solve the problem.
3. Wonder how to do this? repartition the input dataset to have less
partitions? I used df.rdd.getNumPartitions() to check the input data
partitions, they have 9 and 17 partitions respectively, should I decrease
them further? I also read a post on StackOverflow (
https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se),
saying increasing partitions may help.Which one makes more sense? I
repartitioned the input data to 20 and 30 partitions, but still no luck.

Any suggestions?

23/03/10 14:32:19 WARN TaskSetManager: Lost task 58.1 in stage 27.0
(TID 3783) (10.1.0.116 executor 33): FetchFailed(BlockManagerId(72,
10.1.15.199, 36791, None), shuffleId=24, mapIndex=77, mapId=3457,
reduceId=58, message=
org.apache.spark.shuffle.FetchFailedException
at 
org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1180)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:918)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
at 
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
at 
org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at