Adding spark.shuffle.useOldFetchProtocol=true changed the outcome of the job 
however it still was not stable in the face of spot instances
going away. Adding spark.decommission.enabled=true, 
spark.storage.decommission.enabled=true and 
spark.executor.decommission.killInterval=110
appears to have completely stabilized the job (not sure which did the trick as 
I added them at the same time). Perhaps extra documentation or
clarifications should be added as it doesn't seem clear to me how to arrivate 
at job stability using dynamic allocation without trial and
error.

On Mon, 2024-08-19 at 13:01 +0000, Aaron Grubb wrote:
> Hi all,
>
> I'm running Spark on Kubernetes on AWS using only spot instances for 
> executors with dynamic allocation enabled. This particular job is
> being
> triggered by Airflow and it hit this bug [1] 6 times in a row. However, I had 
> recently switched to using PersistentVolumeClaims in Spark
> with
> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
>  but kept
> spark.dynamicAllocation.shuffleTracking.enabled=true. Upon review, I see 
> under the notes for spark.dynamicAllocation.enabled [2] that these
> configurations are "or" not "and". However, when setting 
> spark.dynamicAllocation.shuffleTracking.enabled=false, my job crashes with the
> message
>
> org.apache.spark.SparkException: Dynamic allocation of executors requires one 
> of the following conditions: 1) enabling external shuffle
> service through spark.shuffle.service.enabled. 2) enabling shuffle tracking 
> through spark.dynamicAllocation.shuffleTracking.enabled. 3)
> enabling shuffle blocks decommission through spark.decommission.enabled and 
> spark.storage.decommission.shuffleBlocks.enabled. 4)
> (Experimental) configuring spark.shuffle.sort.io.plugin.class to use a custom 
> ShuffleDataIO who's ShuffleDriverComponents supports reliable
> storage.
>
> Am I hitting this bug unavoidably? Or is there a configuration I'm missing to 
> enable
> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
>  to replace
> spark.dynamicAllocation.shuffleTracking.enabled=true?
>
> Using Spark 3.5.1 - here's my full spark-defaults.conf just in case
>
> spark.checkpoint.compress                                                     
>                  true
> spark.driver.cores                                                            
>                                    1
> spark.driver.maxResultSize                                                    
>                  2g
> spark.driver.memory                                                           
>                  5140m
> spark.dynamicAllocation.enabled                                               
>                  true
> spark.dynamicAllocation.executorAllocationRatio                               
>                  0.33
> spark.dynamicAllocation.maxExecutors                                          
>                  20
> spark.dynamicAllocation.sustainedSchedulerBacklogTimeout                      
>                  30
> spark.eventLog.enabled                                                        
>                  true
> spark.executor.cores                                                          
>                                3
> spark.executor.logs.rolling.enableCompression                                 
>                  true
> spark.executor.logs.rolling.maxRetainedFiles                                  
>                  48
> spark.executor.logs.rolling.strategy                                          
>                  time
> spark.executor.logs.rolling.time.interval                                     
>                  hourly
> spark.hadoop.fs.s3a.impl                                                      
>                  org.apache.hadoop.fs.s3a.S3AFileSystem
> spark.hadoop.fs.s3a.connection.ssl.enabled                                    
>                  false
> spark.hadoop.fs.s3a.fast.upload                                               
>                  true
> spark.kryo.registrationRequired                                               
>                  false
> spark.kryo.unsafe                                                             
>                  false
> spark.kryoserializer.buffer                                                   
>                  1m
> spark.kryoserializer.buffer.max                                               
>                  1g
> spark.kubernetes.driver.limit.cores                                           
>                  750m
> spark.kubernetes.driver.ownPersistentVolumeClaim                              
>                  true
> spark.kubernetes.driver.request.cores                                         
>                  750m
> spark.kubernetes.driver.reusePersistentVolumeClaim                            
>                  true
> spark.kubernetes.driver.waitToReusePersistentVolumeClaim                      
>                  true
> spark.kubernetes.executor.limit.cores                                         
>                  3700m
> spark.kubernetes.executor.request.cores                                       
>                  3700m
> spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName
>     OnDemand
> spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path
>            /data/spark-x/executor-x
> spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly
>        false
> spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit
>     20Gi
> spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass
>  ebs-sc
> spark.kubernetes.namespace                                                    
>                  spark
> spark.serializer                                                              
>                  org.apache.spark.serializer.KryoSerializer
> spark.shuffle.sort.io.plugin.class
> org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
> spark.sql.orc.compression.codec                                               
>                  zlib
> spark.sql.pyspark.jvmStacktrace.enabled                                       
>                  true
> spark.sql.sources.partitionOverwriteMode                                      
>                  dynamic
> spark.sql.streaming.kafka.useDeprecatedOffsetFetching                         
>                  false
> spark.submit.deployMode                                                       
>                  cluster
>
> Thanks,
> Aaron
>
> [1]
> https://issues.apache.org/jira/browse/SPARK-45858
> [2]
> https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to