[ https://issues.apache.org/jira/browse/SPARK-33896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17255342#comment-17255342 ]
Xudingyu commented on SPARK-33896: ---------------------------------- [~sro...@scient.com][~sro...@yahoo.com][~sowen] > Make Spark DAGScheduler datasource cache aware when scheduling tasks in a > multi-replication HDFS > ------------------------------------------------------------------------------------------------ > > Key: SPARK-33896 > URL: https://issues.apache.org/jira/browse/SPARK-33896 > Project: Spark > Issue Type: New Feature > Components: Spark Core > Affects Versions: 3.0.0 > Reporter: Xudingyu > Priority: Critical > > *Goals:* > • Make Spark 3.0 Scheduler DataSource-Cache-Aware in multi-replication > HDFS cluster > • Performance gain in E2E workload when enabling this feature > *Problem Statement:* > Spark’s DAGScheduler currently schedule tasks according to RDD’s > preferLocations, which repects HDFS BlockLocation. In a multi-replication > cluster, HDFS BlockLocation can be returned as an Array[BlockLocation], Spark > chooses one of the BlockLocation to run tasks on. +However,tasks can run > faster if scheduled to the nodes with datasource cache that they need. > Currently there’re no datasource cache locality provision mechanism in Spark > if nodes in the cluster have cache data+. > This project aims to add a cache-locality-aware mechanism. Spark DAGScheduler > can schedule tasks to the nodes with datasource cache according to cache > locality in a multi-replication HDFS. > *Basic idea:* > The basic idea is to open a datasource cache locality provider interface in > Spark and with default implementation is to respect HDFS BlockLocation. > Worker nodes datasource cache meta(like offset, length) needs to be stored in > an externalDB like Redis. Spark driver can look up these cache meta and > customize task schedule locality algorithm to choose the most efficient node. > *CBL(Cost Based Locality)* > CBL(cost based locality), takes cache size、disk IO、network IO...... into > account when scheduling tasks. > Say there’re 3 nodes A、B、C in a 2-replication HDFS cluster. When Spark > scheduling task1, nodeB have all the data replication on disk that task1 > needs, at the same time, nodeA has 20% datasource cache and 50% data > replication on disk. > Then we calculate the cost for schedule task1 on nodeA、nodeB and nodeC. > CostA = CalculateCost(20% read from cache) + CalculateCost(50% read from > disk) + CalculateCost(30% read from remote) > CostB = CalculateCost(100% read from disk) > CostC = CalculateCost(100% read from remote) > Return the node with minimal cost. > *Modifications:* > A config is needed to decide which cache locality provider to use, can be as > follows > {code:java} > SQLConf.PARTITIONED_FILE_PREFERREDLOC_IMPL > {code} > For Spark3.0 need to modify FilePartition.scala$preferredLocations() can be > as follows > {code:java} > override def preferredLocations(): Array[String] = { > Utils.classForName(SparkEnv.get.conf.get(SQLConf.PARTITIONED_FILE_PREFERREDLOC_IMPL)) > . getConstructor() > . newInstance() > . getPreferredLocs() > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org