[ 
https://issues.apache.org/jira/browse/SPARK-33896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17255342#comment-17255342
 ] 

Xudingyu commented on SPARK-33896:
----------------------------------

[~sro...@scient.com][~sro...@yahoo.com][~sowen]

> Make Spark DAGScheduler datasource cache aware when scheduling tasks in a 
> multi-replication HDFS
> ------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-33896
>                 URL: https://issues.apache.org/jira/browse/SPARK-33896
>             Project: Spark
>          Issue Type: New Feature
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Xudingyu
>            Priority: Critical
>
> *Goals:*
> •     Make Spark 3.0 Scheduler DataSource-Cache-Aware in multi-replication 
> HDFS cluster
> •     Performance gain in E2E workload when enabling this feature
> *Problem Statement:*
> Spark’s DAGScheduler currently schedule tasks according to RDD’s 
> preferLocations, which repects HDFS BlockLocation. In a multi-replication 
> cluster, HDFS BlockLocation can be returned as an Array[BlockLocation], Spark 
> chooses one of the BlockLocation to run tasks on. +However,tasks can run 
> faster if scheduled to the nodes with datasource cache that they need. 
> Currently there’re no datasource cache locality provision mechanism in Spark 
> if nodes in the cluster have cache data+.
> This project aims to add a cache-locality-aware mechanism. Spark DAGScheduler 
> can schedule tasks to the nodes with datasource cache according to cache 
> locality in a multi-replication HDFS.
> *Basic idea:*
> The basic idea is to open a datasource cache locality provider interface in 
> Spark and with default implementation is to respect HDFS BlockLocation. 
> Worker nodes datasource cache meta(like offset, length) needs to be stored in 
> an externalDB like Redis. Spark driver can look up these cache meta and 
> customize task schedule locality algorithm to choose the most efficient node.
> *CBL(Cost Based Locality)*
> CBL(cost based locality), takes cache size、disk IO、network IO...... into 
> account when scheduling tasks.
> Say there’re 3 nodes A、B、C in a 2-replication HDFS cluster. When Spark 
> scheduling task1, nodeB have all the data replication on disk that task1 
> needs, at the same time, nodeA has 20% datasource cache and 50% data 
> replication on disk.
> Then we calculate the cost for schedule task1 on nodeA、nodeB and nodeC.
> CostA = CalculateCost(20% read from cache) + CalculateCost(50% read from 
> disk) + CalculateCost(30% read from remote)
> CostB = CalculateCost(100% read from disk)
> CostC = CalculateCost(100% read from remote)
> Return the node with minimal cost.
> *Modifications:*
> A config is needed to decide which cache locality provider to use, can be as 
> follows
> {code:java}
> SQLConf.PARTITIONED_FILE_PREFERREDLOC_IMPL
> {code}
> For Spark3.0 need to modify FilePartition.scala$preferredLocations() can be 
> as follows
> {code:java}
> override def preferredLocations(): Array[String] = {
> Utils.classForName(SparkEnv.get.conf.get(SQLConf.PARTITIONED_FILE_PREFERREDLOC_IMPL))
>       . getConstructor()
>               . newInstance()
>       . getPreferredLocs()
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to