Re: Hitting SPARK-45858 on Kubernetes - Unavoidable bug or misconfiguration?

2024-08-20 Thread Aaron Grubb
Adding spark.shuffle.useOldFetchProtocol=true changed the outcome of the job 
however it still was not stable in the face of spot instances
going away. Adding spark.decommission.enabled=true, 
spark.storage.decommission.enabled=true and 
spark.executor.decommission.killInterval=110
appears to have completely stabilized the job (not sure which did the trick as 
I added them at the same time). Perhaps extra documentation or
clarifications should be added as it doesn't seem clear to me how to arrivate 
at job stability using dynamic allocation without trial and
error.

On Mon, 2024-08-19 at 13:01 +0000, Aaron Grubb wrote:
> Hi all,
>
> I'm running Spark on Kubernetes on AWS using only spot instances for 
> executors with dynamic allocation enabled. This particular job is
> being
> triggered by Airflow and it hit this bug [1] 6 times in a row. However, I had 
> recently switched to using PersistentVolumeClaims in Spark
> with
> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
>  but kept
> spark.dynamicAllocation.shuffleTracking.enabled=true. Upon review, I see 
> under the notes for spark.dynamicAllocation.enabled [2] that these
> configurations are "or" not "and". However, when setting 
> spark.dynamicAllocation.shuffleTracking.enabled=false, my job crashes with the
> message
>
> org.apache.spark.SparkException: Dynamic allocation of executors requires one 
> of the following conditions: 1) enabling external shuffle
> service through spark.shuffle.service.enabled. 2) enabling shuffle tracking 
> through spark.dynamicAllocation.shuffleTracking.enabled. 3)
> enabling shuffle blocks decommission through spark.decommission.enabled and 
> spark.storage.decommission.shuffleBlocks.enabled. 4)
> (Experimental) configuring spark.shuffle.sort.io.plugin.class to use a custom 
> ShuffleDataIO who's ShuffleDriverComponents supports reliable
> storage.
>
> Am I hitting this bug unavoidably? Or is there a configuration I'm missing to 
> enable
> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
>  to replace
> spark.dynamicAllocation.shuffleTracking.enabled=true?
>
> Using Spark 3.5.1 - here's my full spark-defaults.conf just in case
>
> spark.checkpoint.compress 
>  true
> spark.driver.cores
>1
> spark.driver.maxResultSize
>  2g
> spark.driver.memory   
>  5140m
> spark.dynamicAllocation.enabled   
>  true
> spark.dynamicAllocation.executorAllocationRatio   
>  0.33
> spark.dynamicAllocation.maxExecutors  
>  20
> spark.dynamicAllocation.sustainedSchedulerBacklogTimeout  
>  30
> spark.eventLog.enabled
>  true
> spark.executor.cores  
>3
> spark.executor.logs.rolling.enableCompression 
>  true
> spark.executor.logs.rolling.maxRetainedFiles  
>  48
> spark.executor.logs.rolling.strategy  
>  time
> spark.executor.logs.rolling.time.interval 
>  hourly
> spark.hadoop.fs.s3a.impl  
>  org.apache.hadoop.fs.s3a.S3AFileSystem
> spark.hadoop.fs.s3a.connection.ssl.enabled
>  false
> spark.hadoop.fs.s3a.fast.upload   
>  true
> spark.kryo.registrationRequired   
>  false
> spark.kryo.unsafe 
>  false
> spark.kryoserializer.buffer   
>  1m
> spark.kryoserializer.buffer.max   
>  1g
> spark.kubernetes.driver.limit.cores   
>  750m
> spark.kubernetes.driver.ownPersistentVolumeClaim  
>  

Hitting SPARK-45858 on Kubernetes - Unavoidable bug or misconfiguration?

2024-08-19 Thread Aaron Grubb
Hi all,

I'm running Spark on Kubernetes on AWS using only spot instances for executors 
with dynamic allocation enabled. This particular job is being
triggered by Airflow and it hit this bug [1] 6 times in a row. However, I had 
recently switched to using PersistentVolumeClaims in Spark with
spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
 but kept
spark.dynamicAllocation.shuffleTracking.enabled=true. Upon review, I see under 
the notes for spark.dynamicAllocation.enabled [2] that these
configurations are "or" not "and". However, when setting 
spark.dynamicAllocation.shuffleTracking.enabled=false, my job crashes with the
message

org.apache.spark.SparkException: Dynamic allocation of executors requires one 
of the following conditions: 1) enabling external shuffle
service through spark.shuffle.service.enabled. 2) enabling shuffle tracking 
through spark.dynamicAllocation.shuffleTracking.enabled. 3)
enabling shuffle blocks decommission through spark.decommission.enabled and 
spark.storage.decommission.shuffleBlocks.enabled. 4)
(Experimental) configuring spark.shuffle.sort.io.plugin.class to use a custom 
ShuffleDataIO who's ShuffleDriverComponents supports reliable
storage.

Am I hitting this bug unavoidably? Or is there a configuration I'm missing to 
enable
spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
 to replace
spark.dynamicAllocation.shuffleTracking.enabled=true?

Using Spark 3.5.1 - here's my full spark-defaults.conf just in case

spark.checkpoint.compress   
   true
spark.driver.cores  
   1
spark.driver.maxResultSize  
   2g
spark.driver.memory 
   5140m
spark.dynamicAllocation.enabled 
   true
spark.dynamicAllocation.executorAllocationRatio 
   0.33
spark.dynamicAllocation.maxExecutors
   20
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
   30
spark.eventLog.enabled  
   true
spark.executor.cores
   3
spark.executor.logs.rolling.enableCompression   
   true
spark.executor.logs.rolling.maxRetainedFiles
   48
spark.executor.logs.rolling.strategy
   time
spark.executor.logs.rolling.time.interval   
   hourly
spark.hadoop.fs.s3a.impl
   org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.connection.ssl.enabled  
   false
spark.hadoop.fs.s3a.fast.upload 
   true
spark.kryo.registrationRequired 
   false
spark.kryo.unsafe   
   false
spark.kryoserializer.buffer 
   1m
spark.kryoserializer.buffer.max 
   1g
spark.kubernetes.driver.limit.cores 
   750m
spark.kubernetes.driver.ownPersistentVolumeClaim
   true
spark.kubernetes.driver.request.cores   
   750m
spark.kubernetes.driver.reusePersistentVolumeClaim  
   true
spark.kubernetes.driver.waitToReusePersistentVolumeClaim
   true
spark.kubernetes.executor.limit.cores   
   3700m
spark.kubernetes.executor.request.cores 
   3700m
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName
OnDemand
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path
   /data/spark-x/executor-x
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly
   false
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit
20Gi
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass
 ebs-sc
spark.kubernetes.namespac

Re: [Spark]: Spark / Iceberg / hadoop-aws compatibility matrix

2024-04-03 Thread Aaron Grubb
Downgrade to hadoop-*:3.3.x, Hadoop 3.4.x is based on the AWS SDK v2 and should 
probably be considered as breaking for tools that build on < 3.4.0 while using 
AWS.

From: Oxlade, Dan 
Sent: Wednesday, April 3, 2024 2:41:11 PM
To: user@spark.apache.org 
Subject: [Spark]: Spark / Iceberg / hadoop-aws compatibility matrix


Hi all,



I’ve struggled with this for quite some time.

My requirement is to read a parquet file from s3 to a Dataframe then append to 
an existing iceberg table.



In order to read the parquet I need the hadoop-aws dependency for s3a:// . In 
order to write to iceberg I need the iceberg dependency. Both of these 
dependencies have a transitive dependency on the aws SDK. I can’t find versions 
for Spark 3.4 that work together.





Current Versions:

Spark 3.4.1

iceberg-spark-runtime-3.4-2.12:1.4.1

iceberg-aws-bundle:1.4.1

hadoop-aws:3.4.0

hadoop-common:3.4.0



I’ve tried a number of combinations of the above and their respective versions 
but all fall over with their assumptions on the aws sdk version with class not 
found exceptions or method not found etc.



Is there a compatibility matrix somewhere that someone could point me to?



Thanks

Dan

T. Rowe Price International Ltd (registered number 3957748) is registered in 
England and Wales with its registered office at Warwick Court, 5 Paternoster 
Square, London EC4M 7DX. T. Rowe Price International Ltd is authorised and 
regulated by the Financial Conduct Authority. The company has a branch in Dubai 
International Financial Centre (regulated by the DFSA as a Representative 
Office).

T. Rowe Price (including T. Rowe Price International Ltd and its affiliates) 
and its associates do not provide legal or tax advice. Any tax-related 
discussion contained in this e-mail, including any attachments, is not intended 
or written to be used, and cannot be used, for the purpose of (i) avoiding any 
tax penalties or (ii) promoting, marketing, or recommending to any other party 
any transaction or matter addressed herein. Please consult your independent 
legal counsel and/or professional tax advisor regarding any legal or tax issues 
raised in this e-mail.

The contents of this e-mail and any attachments are intended solely for the use 
of the named addressee(s) and may contain confidential and/or privileged 
information. Any unauthorized use, copying, disclosure, or distribution of the 
contents of this e-mail is strictly prohibited by the sender and may be 
unlawful. If you are not the intended recipient, please notify the sender 
immediately and delete this e-mail.


Re: Spark reading from HBase using hbase-connectors - any benefit from localization?

2023-01-06 Thread Aaron Grubb
Hi Mich,

Thanks a lot for the insight, it was very helpful.

Aaron

On Thu, 2023-01-05 at 23:44 +, Mich Talebzadeh wrote:
Hi Aaron,

Thanks for the details.

It is a general practice when running Spark on premise to use Hadoop 
clusters.<https://spark.apache.org/faq.html#:~:text=How%20does%20Spark%20relate%20to,Hive%2C%20and%20any%20Hadoop%20InputForm>
 This comes from the notion of data locality. Data locality in simple terms 
means doing computation on the node where data resides. As you are already 
aware Spark is a cluster computing system. It is not a storage system like HDFS 
or HBase.  Spark is used to process the data stored in such distributed 
systems. In case there is a spark application which is processing data stored 
in HDFS., for example PARQUET files on HDFS,  Spark will attempt to place 
computation tasks alongside HDFS blocks.
With HDFS the Spark driver contacts NameNode about the DataNodes (ideally 
local) containing the various blocks of a file or directory as well as their 
locations (represented as InputSplits), and then schedules the work to the 
Spark Workers.

Moving on, Spark on Hadoop communicates with Hive, it uses an efficient API to 
talk to Hive without the need for JDBC drivers so that is another advantage 
point here.

Spark can talk to HBase through Spark-Hbase 
connecto<https://github.com/LucaCanali/Miscellaneous/blob/master/Spark_Notes/Spark_HBase_Connector.md>r
  which provides HBaseContext to interact Spark with HBase. HBaseContext pushes 
the configuration to the Spark executors and allows it to have an HBase 
Connection per Spark Executor.


With regard to your question:


 Would running Spark on YARN on the same machines where both HDFS and HBase are 
running provide localization benefits when Spark reads from HBase, or are 
localization benefits negligible and it's a better idea to put Spark in a 
standalone cluster?


As per my previous points, I believe it does --> HBaseContext pushes the 
configuration to the Spark executors and allows it to have an HBase Connection 
per Spark Executor.Putting Spark on a standalone cluster will add to the cost 
and IMO will not achieve much.


HTH

 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>

 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk.Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 5 Jan 2023 at 22:53, Aaron Grubb 
mailto:aa...@kaden.ai>> wrote:
Hi Mich,

Thanks for your reply. In hindsight I realize I didn't provide enough 
information about the infrastructure for the question to be answered properly. 
We are currently running a Hadoop cluster with nodes that have the following 
services:

- HDFS NameNode (3.3.4)
- YARN NodeManager (3.3.4)
- HBase RegionServer (2.4.15)
- LLAP on YARN (3.1.3)

So to answer your questions directly, putting Spark on the Hadoop nodes is the 
first idea that I had in order to colocate Spark with HBase for reads (HBase is 
sharing nodes with Hadoop to answer the second question). However, what 
currently happens is, when a Hive query runs that either reads from or writes 
to HBase, there ends up being resource contention as HBase threads "spill over" 
onto vcores that are in theory reserved for YARN. We tolerate this in order for 
both LLAP and HBase to benefit from short circuited reads, but when it comes to 
Spark, I was hoping to find out if that same localization benefit would exist 
when reading from HBase, or if it would be better to incur the cost of 
inter-server, intra-VPC traffic in order to avoid resource contention between 
Spark and HBase during data loading. Regarding HBase being the speed layer and 
Parquet files being the batch layer, I was more looking at both of them as the 
batch layer, but the role HBase plays is it reduces the amount of data scanning 
and joining needed to support our use case. Basically we receive events that 
number in the thousands, and those events need to be matched to events that 
number in the hundreds of millions, but they both share a UUIDv4, so instead of 
matching those rows in a MR-style job, we run simple inserts into HBase with 
the UUIDv4 as the table key. The parquet files would end up being data from 
HBase that are past the window for us to receive more events for that UUIDv4, 
i.e. static data. I'm happy to draw up a diagram but hopefully these details 
are enough for an understanding of the question.

To attempt to summarize, would running Spark on YARN on the same machines where 
both HDFS and HBase are running provi

Re: Spark reading from HBase using hbase-connectors - any benefit from localization?

2023-01-05 Thread Aaron Grubb
Hi Mich,

Thanks for your reply. In hindsight I realize I didn't provide enough 
information about the infrastructure for the question to be answered properly. 
We are currently running a Hadoop cluster with nodes that have the following 
services:

- HDFS NameNode (3.3.4)
- YARN NodeManager (3.3.4)
- HBase RegionServer (2.4.15)
- LLAP on YARN (3.1.3)

So to answer your questions directly, putting Spark on the Hadoop nodes is the 
first idea that I had in order to colocate Spark with HBase for reads (HBase is 
sharing nodes with Hadoop to answer the second question). However, what 
currently happens is, when a Hive query runs that either reads from or writes 
to HBase, there ends up being resource contention as HBase threads "spill over" 
onto vcores that are in theory reserved for YARN. We tolerate this in order for 
both LLAP and HBase to benefit from short circuited reads, but when it comes to 
Spark, I was hoping to find out if that same localization benefit would exist 
when reading from HBase, or if it would be better to incur the cost of 
inter-server, intra-VPC traffic in order to avoid resource contention between 
Spark and HBase during data loading. Regarding HBase being the speed layer and 
Parquet files being the batch layer, I was more looking at both of them as the 
batch layer, but the role HBase plays is it reduces the amount of data scanning 
and joining needed to support our use case. Basically we receive events that 
number in the thousands, and those events need to be matched to events that 
number in the hundreds of millions, but they both share a UUIDv4, so instead of 
matching those rows in a MR-style job, we run simple inserts into HBase with 
the UUIDv4 as the table key. The parquet files would end up being data from 
HBase that are past the window for us to receive more events for that UUIDv4, 
i.e. static data. I'm happy to draw up a diagram but hopefully these details 
are enough for an understanding of the question.

To attempt to summarize, would running Spark on YARN on the same machines where 
both HDFS and HBase are running provide localization benefits when Spark reads 
from HBase, or are localization benefits negligible and it's a better idea to 
put Spark in a standalone cluster?

Thanks for your time,
Aaron

On Thu, 2023-01-05 at 19:00 +, Mich Talebzadeh wrote:
Few questions

  *   As I understand you already have a Hadoop cluster. Are you going to put 
your spark as Hadoopp nodes?
  *   Where is your HBase cluster? Is it sharing nodes with Hadoop or has its 
own cluster

I looked at that link and it does not say much. Essentially you want to use 
HBase for speed layer and your inactive data is stored in Parquet files on 
HDFS. So that is your batch layer so to speak.

Have a look at this article of mine Real Time Processing of Trade Data with 
Kafka, Flume, Spark, Hbase and 
MongoDB<https://www.linkedin.com/pulse/real-time-processing-trade-data-kafka-flume-spark-talebzadeh-ph-d-/>,
 a bit dated but still valid.

  *

It helps if you provide an Architectural diagram of your proposed solution.


You then need to do a PoC to see how it looks.


HTH
 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>

 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk.Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 5 Jan 2023 at 09:35, Aaron Grubb 
mailto:aa...@kaden.ai>> wrote:
(cross-posting from the HBase user list as I didn't receive a reply there)

Hello,

I'm completely new to Spark and evaluating setting up a cluster either in YARN 
or standalone. Our idea for the general workflow is create a concatenated 
dataframe using historical pickle/parquet files (whichever is faster) and 
current data stored in HBase. I'm aware of the benefit of short circuit reads 
if the historical files are stored in HDFS but I'm more concerned about 
resource contention between Spark and HBase during data loading. My question 
is, would running Spark on the same nodes provide a benefit when using 
hbase-connectors 
(https://github.com/apache/hbase-connectors/tree/master/spark)? Is there a 
mechanism in the connector to "pass through" a short circuit read to Spark, or 
would data always bounce from HDFS -> RegionServer -> Spark?

Thanks in advance,
Aaron



Spark reading from HBase using hbase-connectors - any benefit from localization?

2023-01-05 Thread Aaron Grubb
(cross-posting from the HBase user list as I didn't receive a reply there)

Hello,

I'm completely new to Spark and evaluating setting up a cluster either in YARN 
or standalone. Our idea for the general workflow is create a concatenated 
dataframe using historical pickle/parquet files (whichever is faster) and 
current data stored in HBase. I'm aware of the benefit of short circuit reads 
if the historical files are stored in HDFS but I'm more concerned about 
resource contention between Spark and HBase during data loading. My question 
is, would running Spark on the same nodes provide a benefit when using 
hbase-connectors 
(https://github.com/apache/hbase-connectors/tree/master/spark)? Is there a 
mechanism in the connector to "pass through" a short circuit read to Spark, or 
would data always bounce from HDFS -> RegionServer -> Spark?

Thanks in advance,
Aaron