Re: Hitting SPARK-45858 on Kubernetes - Unavoidable bug or misconfiguration?
Adding spark.shuffle.useOldFetchProtocol=true changed the outcome of the job however it still was not stable in the face of spot instances going away. Adding spark.decommission.enabled=true, spark.storage.decommission.enabled=true and spark.executor.decommission.killInterval=110 appears to have completely stabilized the job (not sure which did the trick as I added them at the same time). Perhaps extra documentation or clarifications should be added as it doesn't seem clear to me how to arrivate at job stability using dynamic allocation without trial and error. On Mon, 2024-08-19 at 13:01 +0000, Aaron Grubb wrote: > Hi all, > > I'm running Spark on Kubernetes on AWS using only spot instances for > executors with dynamic allocation enabled. This particular job is > being > triggered by Airflow and it hit this bug [1] 6 times in a row. However, I had > recently switched to using PersistentVolumeClaims in Spark > with > spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO > but kept > spark.dynamicAllocation.shuffleTracking.enabled=true. Upon review, I see > under the notes for spark.dynamicAllocation.enabled [2] that these > configurations are "or" not "and". However, when setting > spark.dynamicAllocation.shuffleTracking.enabled=false, my job crashes with the > message > > org.apache.spark.SparkException: Dynamic allocation of executors requires one > of the following conditions: 1) enabling external shuffle > service through spark.shuffle.service.enabled. 2) enabling shuffle tracking > through spark.dynamicAllocation.shuffleTracking.enabled. 3) > enabling shuffle blocks decommission through spark.decommission.enabled and > spark.storage.decommission.shuffleBlocks.enabled. 4) > (Experimental) configuring spark.shuffle.sort.io.plugin.class to use a custom > ShuffleDataIO who's ShuffleDriverComponents supports reliable > storage. > > Am I hitting this bug unavoidably? Or is there a configuration I'm missing to > enable > spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO > to replace > spark.dynamicAllocation.shuffleTracking.enabled=true? > > Using Spark 3.5.1 - here's my full spark-defaults.conf just in case > > spark.checkpoint.compress > true > spark.driver.cores >1 > spark.driver.maxResultSize > 2g > spark.driver.memory > 5140m > spark.dynamicAllocation.enabled > true > spark.dynamicAllocation.executorAllocationRatio > 0.33 > spark.dynamicAllocation.maxExecutors > 20 > spark.dynamicAllocation.sustainedSchedulerBacklogTimeout > 30 > spark.eventLog.enabled > true > spark.executor.cores >3 > spark.executor.logs.rolling.enableCompression > true > spark.executor.logs.rolling.maxRetainedFiles > 48 > spark.executor.logs.rolling.strategy > time > spark.executor.logs.rolling.time.interval > hourly > spark.hadoop.fs.s3a.impl > org.apache.hadoop.fs.s3a.S3AFileSystem > spark.hadoop.fs.s3a.connection.ssl.enabled > false > spark.hadoop.fs.s3a.fast.upload > true > spark.kryo.registrationRequired > false > spark.kryo.unsafe > false > spark.kryoserializer.buffer > 1m > spark.kryoserializer.buffer.max > 1g > spark.kubernetes.driver.limit.cores > 750m > spark.kubernetes.driver.ownPersistentVolumeClaim >
Hitting SPARK-45858 on Kubernetes - Unavoidable bug or misconfiguration?
Hi all, I'm running Spark on Kubernetes on AWS using only spot instances for executors with dynamic allocation enabled. This particular job is being triggered by Airflow and it hit this bug [1] 6 times in a row. However, I had recently switched to using PersistentVolumeClaims in Spark with spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO but kept spark.dynamicAllocation.shuffleTracking.enabled=true. Upon review, I see under the notes for spark.dynamicAllocation.enabled [2] that these configurations are "or" not "and". However, when setting spark.dynamicAllocation.shuffleTracking.enabled=false, my job crashes with the message org.apache.spark.SparkException: Dynamic allocation of executors requires one of the following conditions: 1) enabling external shuffle service through spark.shuffle.service.enabled. 2) enabling shuffle tracking through spark.dynamicAllocation.shuffleTracking.enabled. 3) enabling shuffle blocks decommission through spark.decommission.enabled and spark.storage.decommission.shuffleBlocks.enabled. 4) (Experimental) configuring spark.shuffle.sort.io.plugin.class to use a custom ShuffleDataIO who's ShuffleDriverComponents supports reliable storage. Am I hitting this bug unavoidably? Or is there a configuration I'm missing to enable spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO to replace spark.dynamicAllocation.shuffleTracking.enabled=true? Using Spark 3.5.1 - here's my full spark-defaults.conf just in case spark.checkpoint.compress true spark.driver.cores 1 spark.driver.maxResultSize 2g spark.driver.memory 5140m spark.dynamicAllocation.enabled true spark.dynamicAllocation.executorAllocationRatio 0.33 spark.dynamicAllocation.maxExecutors 20 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 30 spark.eventLog.enabled true spark.executor.cores 3 spark.executor.logs.rolling.enableCompression true spark.executor.logs.rolling.maxRetainedFiles 48 spark.executor.logs.rolling.strategy time spark.executor.logs.rolling.time.interval hourly spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem spark.hadoop.fs.s3a.connection.ssl.enabled false spark.hadoop.fs.s3a.fast.upload true spark.kryo.registrationRequired false spark.kryo.unsafe false spark.kryoserializer.buffer 1m spark.kryoserializer.buffer.max 1g spark.kubernetes.driver.limit.cores 750m spark.kubernetes.driver.ownPersistentVolumeClaim true spark.kubernetes.driver.request.cores 750m spark.kubernetes.driver.reusePersistentVolumeClaim true spark.kubernetes.driver.waitToReusePersistentVolumeClaim true spark.kubernetes.executor.limit.cores 3700m spark.kubernetes.executor.request.cores 3700m spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName OnDemand spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path /data/spark-x/executor-x spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly false spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit 20Gi spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass ebs-sc spark.kubernetes.namespac
Re: [Spark]: Spark / Iceberg / hadoop-aws compatibility matrix
Downgrade to hadoop-*:3.3.x, Hadoop 3.4.x is based on the AWS SDK v2 and should probably be considered as breaking for tools that build on < 3.4.0 while using AWS. From: Oxlade, Dan Sent: Wednesday, April 3, 2024 2:41:11 PM To: user@spark.apache.org Subject: [Spark]: Spark / Iceberg / hadoop-aws compatibility matrix Hi all, I’ve struggled with this for quite some time. My requirement is to read a parquet file from s3 to a Dataframe then append to an existing iceberg table. In order to read the parquet I need the hadoop-aws dependency for s3a:// . In order to write to iceberg I need the iceberg dependency. Both of these dependencies have a transitive dependency on the aws SDK. I can’t find versions for Spark 3.4 that work together. Current Versions: Spark 3.4.1 iceberg-spark-runtime-3.4-2.12:1.4.1 iceberg-aws-bundle:1.4.1 hadoop-aws:3.4.0 hadoop-common:3.4.0 I’ve tried a number of combinations of the above and their respective versions but all fall over with their assumptions on the aws sdk version with class not found exceptions or method not found etc. Is there a compatibility matrix somewhere that someone could point me to? Thanks Dan T. Rowe Price International Ltd (registered number 3957748) is registered in England and Wales with its registered office at Warwick Court, 5 Paternoster Square, London EC4M 7DX. T. Rowe Price International Ltd is authorised and regulated by the Financial Conduct Authority. The company has a branch in Dubai International Financial Centre (regulated by the DFSA as a Representative Office). T. Rowe Price (including T. Rowe Price International Ltd and its affiliates) and its associates do not provide legal or tax advice. Any tax-related discussion contained in this e-mail, including any attachments, is not intended or written to be used, and cannot be used, for the purpose of (i) avoiding any tax penalties or (ii) promoting, marketing, or recommending to any other party any transaction or matter addressed herein. Please consult your independent legal counsel and/or professional tax advisor regarding any legal or tax issues raised in this e-mail. The contents of this e-mail and any attachments are intended solely for the use of the named addressee(s) and may contain confidential and/or privileged information. Any unauthorized use, copying, disclosure, or distribution of the contents of this e-mail is strictly prohibited by the sender and may be unlawful. If you are not the intended recipient, please notify the sender immediately and delete this e-mail.
Re: Spark reading from HBase using hbase-connectors - any benefit from localization?
Hi Mich, Thanks a lot for the insight, it was very helpful. Aaron On Thu, 2023-01-05 at 23:44 +, Mich Talebzadeh wrote: Hi Aaron, Thanks for the details. It is a general practice when running Spark on premise to use Hadoop clusters.<https://spark.apache.org/faq.html#:~:text=How%20does%20Spark%20relate%20to,Hive%2C%20and%20any%20Hadoop%20InputForm> This comes from the notion of data locality. Data locality in simple terms means doing computation on the node where data resides. As you are already aware Spark is a cluster computing system. It is not a storage system like HDFS or HBase. Spark is used to process the data stored in such distributed systems. In case there is a spark application which is processing data stored in HDFS., for example PARQUET files on HDFS, Spark will attempt to place computation tasks alongside HDFS blocks. With HDFS the Spark driver contacts NameNode about the DataNodes (ideally local) containing the various blocks of a file or directory as well as their locations (represented as InputSplits), and then schedules the work to the Spark Workers. Moving on, Spark on Hadoop communicates with Hive, it uses an efficient API to talk to Hive without the need for JDBC drivers so that is another advantage point here. Spark can talk to HBase through Spark-Hbase connecto<https://github.com/LucaCanali/Miscellaneous/blob/master/Spark_Notes/Spark_HBase_Connector.md>r which provides HBaseContext to interact Spark with HBase. HBaseContext pushes the configuration to the Spark executors and allows it to have an HBase Connection per Spark Executor. With regard to your question: Would running Spark on YARN on the same machines where both HDFS and HBase are running provide localization benefits when Spark reads from HBase, or are localization benefits negligible and it's a better idea to put Spark in a standalone cluster? As per my previous points, I believe it does --> HBaseContext pushes the configuration to the Spark executors and allows it to have an HBase Connection per Spark Executor.Putting Spark on a standalone cluster will add to the cost and IMO will not achieve much. HTH [https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE] view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: Use it at your own risk.Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 5 Jan 2023 at 22:53, Aaron Grubb mailto:aa...@kaden.ai>> wrote: Hi Mich, Thanks for your reply. In hindsight I realize I didn't provide enough information about the infrastructure for the question to be answered properly. We are currently running a Hadoop cluster with nodes that have the following services: - HDFS NameNode (3.3.4) - YARN NodeManager (3.3.4) - HBase RegionServer (2.4.15) - LLAP on YARN (3.1.3) So to answer your questions directly, putting Spark on the Hadoop nodes is the first idea that I had in order to colocate Spark with HBase for reads (HBase is sharing nodes with Hadoop to answer the second question). However, what currently happens is, when a Hive query runs that either reads from or writes to HBase, there ends up being resource contention as HBase threads "spill over" onto vcores that are in theory reserved for YARN. We tolerate this in order for both LLAP and HBase to benefit from short circuited reads, but when it comes to Spark, I was hoping to find out if that same localization benefit would exist when reading from HBase, or if it would be better to incur the cost of inter-server, intra-VPC traffic in order to avoid resource contention between Spark and HBase during data loading. Regarding HBase being the speed layer and Parquet files being the batch layer, I was more looking at both of them as the batch layer, but the role HBase plays is it reduces the amount of data scanning and joining needed to support our use case. Basically we receive events that number in the thousands, and those events need to be matched to events that number in the hundreds of millions, but they both share a UUIDv4, so instead of matching those rows in a MR-style job, we run simple inserts into HBase with the UUIDv4 as the table key. The parquet files would end up being data from HBase that are past the window for us to receive more events for that UUIDv4, i.e. static data. I'm happy to draw up a diagram but hopefully these details are enough for an understanding of the question. To attempt to summarize, would running Spark on YARN on the same machines where both HDFS and HBase are running provi
Re: Spark reading from HBase using hbase-connectors - any benefit from localization?
Hi Mich, Thanks for your reply. In hindsight I realize I didn't provide enough information about the infrastructure for the question to be answered properly. We are currently running a Hadoop cluster with nodes that have the following services: - HDFS NameNode (3.3.4) - YARN NodeManager (3.3.4) - HBase RegionServer (2.4.15) - LLAP on YARN (3.1.3) So to answer your questions directly, putting Spark on the Hadoop nodes is the first idea that I had in order to colocate Spark with HBase for reads (HBase is sharing nodes with Hadoop to answer the second question). However, what currently happens is, when a Hive query runs that either reads from or writes to HBase, there ends up being resource contention as HBase threads "spill over" onto vcores that are in theory reserved for YARN. We tolerate this in order for both LLAP and HBase to benefit from short circuited reads, but when it comes to Spark, I was hoping to find out if that same localization benefit would exist when reading from HBase, or if it would be better to incur the cost of inter-server, intra-VPC traffic in order to avoid resource contention between Spark and HBase during data loading. Regarding HBase being the speed layer and Parquet files being the batch layer, I was more looking at both of them as the batch layer, but the role HBase plays is it reduces the amount of data scanning and joining needed to support our use case. Basically we receive events that number in the thousands, and those events need to be matched to events that number in the hundreds of millions, but they both share a UUIDv4, so instead of matching those rows in a MR-style job, we run simple inserts into HBase with the UUIDv4 as the table key. The parquet files would end up being data from HBase that are past the window for us to receive more events for that UUIDv4, i.e. static data. I'm happy to draw up a diagram but hopefully these details are enough for an understanding of the question. To attempt to summarize, would running Spark on YARN on the same machines where both HDFS and HBase are running provide localization benefits when Spark reads from HBase, or are localization benefits negligible and it's a better idea to put Spark in a standalone cluster? Thanks for your time, Aaron On Thu, 2023-01-05 at 19:00 +, Mich Talebzadeh wrote: Few questions * As I understand you already have a Hadoop cluster. Are you going to put your spark as Hadoopp nodes? * Where is your HBase cluster? Is it sharing nodes with Hadoop or has its own cluster I looked at that link and it does not say much. Essentially you want to use HBase for speed layer and your inactive data is stored in Parquet files on HDFS. So that is your batch layer so to speak. Have a look at this article of mine Real Time Processing of Trade Data with Kafka, Flume, Spark, Hbase and MongoDB<https://www.linkedin.com/pulse/real-time-processing-trade-data-kafka-flume-spark-talebzadeh-ph-d-/>, a bit dated but still valid. * It helps if you provide an Architectural diagram of your proposed solution. You then need to do a PoC to see how it looks. HTH [https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE] view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: Use it at your own risk.Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 5 Jan 2023 at 09:35, Aaron Grubb mailto:aa...@kaden.ai>> wrote: (cross-posting from the HBase user list as I didn't receive a reply there) Hello, I'm completely new to Spark and evaluating setting up a cluster either in YARN or standalone. Our idea for the general workflow is create a concatenated dataframe using historical pickle/parquet files (whichever is faster) and current data stored in HBase. I'm aware of the benefit of short circuit reads if the historical files are stored in HDFS but I'm more concerned about resource contention between Spark and HBase during data loading. My question is, would running Spark on the same nodes provide a benefit when using hbase-connectors (https://github.com/apache/hbase-connectors/tree/master/spark)? Is there a mechanism in the connector to "pass through" a short circuit read to Spark, or would data always bounce from HDFS -> RegionServer -> Spark? Thanks in advance, Aaron
Spark reading from HBase using hbase-connectors - any benefit from localization?
(cross-posting from the HBase user list as I didn't receive a reply there) Hello, I'm completely new to Spark and evaluating setting up a cluster either in YARN or standalone. Our idea for the general workflow is create a concatenated dataframe using historical pickle/parquet files (whichever is faster) and current data stored in HBase. I'm aware of the benefit of short circuit reads if the historical files are stored in HDFS but I'm more concerned about resource contention between Spark and HBase during data loading. My question is, would running Spark on the same nodes provide a benefit when using hbase-connectors (https://github.com/apache/hbase-connectors/tree/master/spark)? Is there a mechanism in the connector to "pass through" a short circuit read to Spark, or would data always bounce from HDFS -> RegionServer -> Spark? Thanks in advance, Aaron