Hi everyone I'm trying to use pyspark 3.3.2. I have these relevant options set:
---- spark.dynamicAllocation.enabled=true spark.dynamicAllocation.shuffleTracking.enabled=true spark.dynamicAllocation.shuffleTracking.timeout=20s spark.dynamicAllocation.executorIdleTimeout=30s spark.dynamicAllocation.cachedExecutorIdleTimeout=40s spark.executor.instances=0 spark.dynamicAllocation.minExecutors=0 spark.dynamicAllocation.maxExecutors=20 spark.master=k8s://https://k8s-api.<....>:6443 ---- So I'm using kubernetes to deploy up to 20 executors then I run this piece of code: ---- df = spark.read.parquet("s3a://<directory with ~1.6TB of parquet files>") print(df.count()) time.sleep(999) ---- This works fine and as expected: during the execution ~1600 tasks are completed, 20 executors get deployed and are being quickly removed after the calculation is complete. Next, I add these to the config: ---- spark.decommission.enabled=true spark.storage.decommission.shuffleBlocks.enabled=true spark.storage.decommission.enabled=true spark.storage.decommission.rddBlocks.enabled=true ---- I repeat the experiment on an empty kubernetes cluster, so that no actual pod evicting is occuring. This time executors deallocation is not working as expected: depending on the run, after the job is complete, 0-3 executors out of 20 remain present forever and never seem to get removed. I tried to debug the code and found out that inside the 'ExecutorMonitor.timedOutExecutors' function those executors that never get to be removed do not make it to the 'timedOutExecs' variable, because the property 'hasActiveShuffle' remains 'true' for them. I'm a little stuck here trying to understand how all pod management, shuffle tracking and decommissioning were supposed to be working together, how to debug this and whether this is an expected behavior at all (to me it is not). Thank you for any hints!