Hi everyone

I'm trying to use pyspark 3.3.2.
I have these relevant options set:

----
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.shuffleTracking.enabled=true
spark.dynamicAllocation.shuffleTracking.timeout=20s
spark.dynamicAllocation.executorIdleTimeout=30s
spark.dynamicAllocation.cachedExecutorIdleTimeout=40s
spark.executor.instances=0
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.maxExecutors=20
spark.master=k8s://https://k8s-api.<....>:6443
----

So I'm using kubernetes to deploy up to 20 executors

then I run this piece of code:
----
df = spark.read.parquet("s3a://<directory with ~1.6TB of parquet files>")
print(df.count())
time.sleep(999)
----

This works fine and as expected: during the execution ~1600 tasks are
completed, 20 executors get deployed and are being quickly removed after
the calculation is complete.

Next, I add these to the config:
----
spark.decommission.enabled=true
spark.storage.decommission.shuffleBlocks.enabled=true
spark.storage.decommission.enabled=true
spark.storage.decommission.rddBlocks.enabled=true
----

I repeat the experiment on an empty kubernetes cluster, so that no actual
pod evicting is occuring.

This time executors deallocation is not working as expected: depending on
the run, after the job is complete, 0-3 executors out of 20 remain present
forever and never seem to get removed.

I tried to debug the code and found out that inside the
'ExecutorMonitor.timedOutExecutors' function those executors that never get
to be removed do not make it to the 'timedOutExecs' variable, because the
property 'hasActiveShuffle' remains 'true' for them.

I'm a little stuck here trying to understand how all pod management,
shuffle tracking and decommissioning were supposed to be working together,
how to debug this and whether this is an expected behavior at all (to me it
is not).

Thank you for any hints!

Reply via email to