Hi,

We're starting to use Spark2 with usecases for Dynamic Allocation.
However, it was noticed it doesn't work as expected when dataset is 
cached&uncached (persist&unpersist).
The cluster runs with:
CDH 5.15.0
Spark 2.3.0
Oracle Java 8.131

The following configs are passed to spark (as well as setup at cluster):
# Dynamic Allocation
spark.shuffle.service.enabled                                      true
spark.dynamicAllocation.enabled                                    true

spark.dynamicAllocation.schedulerBacklogTimeout                    1
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout           1
spark.dynamicAllocation.executorIdleTimeout                        90

spark.dynamicAllocation.initialExecutors                           1
spark.dynamicAllocation.minExecutors                               1
spark.dynamicAllocation.maxExecutors                               30

Cluster also has these configs enabled, as well as spark_shuffle is setup and 
YARN application classpath is populated. The executors' storage is freed upon 
application finish (based on: 
https://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service)
Here is the simplified code that reproduced the issue in our cluster (HA YARN).

When the following code is executed with "cache=false" - the executors are 
created, used and killed by idle timeout.
When "cache=true" - the executors are created, used, but not killed and they 
remain hanging.

The storage in both cases was cleaned up.

void run() {

    List<O1> objList = new ArrayList<>();

    for (long i = 0; i < 1000; i++) {

        objList.add(new O1(i, "test"));

    }



    Dataset<O1> ds = sparkSession.createDataset(objList, 
Encoders.bean(O1.class));

    ds = ds.repartition(4);



    if (cache) {

        ds.persist(StorageLevel.MEMORY_AND_DISK());

        try {

            ds.show(100, false);

        } finally {

            ds.unpersist();

        }

    } else {

        ds.show(100, false);

    }

}



//O1 POJO class:

public class O1 {

    private Long transactionDate;

    private String name;



    public O1() {

    }

    public O1(Long transactionDate, String name) {

        this.transactionDate = transactionDate;

        this.name = name;

    }



    public Long getTransactionDate() {

        return transactionDate;

    }

    public void setTransactionDate(Long transactionDate) {

        this.transactionDate = transactionDate;

    }

    public String getName() {

        return name;

    }

    public void setName(String name) {

        this.name = name;

    }

}

Moreover, when spark.dynamicAllocation.cachedExecutorIdleTimeout is set to some 
particular time, then the containers are killed successfully (even if they have 
used cache) (the check was inspired by: 
https://spark.apache.org/docs/latest/job-scheduling.html#graceful-decommission-of-executors
 )

Unfortunately, we will have in future containers that keep cache and might live 
for a long time, as well as containers that free the cache (unpersist) and are 
expected to be killed (along with idling executors).

Is it a bug or some configuration is missing?

Best regards,
Sergejs Andrejevs

Reply via email to