[ 
https://issues.apache.org/jira/browse/SPARK-29852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean R. Owen resolved SPARK-29852.
----------------------------------
    Resolution: Duplicate

> Implement parallel preemptive RDD.toLocalIterator and Dataset.toLocalIterator
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-29852
>                 URL: https://issues.apache.org/jira/browse/SPARK-29852
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core, SQL
>    Affects Versions: 2.4.4, 3.0.0
>            Reporter: Peng Cheng
>            Priority: Major
>   Original Estimate: 0h
>  Remaining Estimate: 0h
>
> Both RDD and Dataset APIs have 2 methods of collecting data from executors to 
> driver:
>  
>  # .collect() setup multiple threads in a job and dump all data from executor 
> into drivers memory. This is great if data on driver needs to be accessible 
> ASAP, but not as efficient if access to partitions can only happen 
> sequentially, and outright risky if driver doesn't have enough memory to hold 
> all data.
> - the solution for issue SPARK-25224 partially alleviate this by delaying 
> deserialisation of data in InternalRow format, such that only the much 
> smaller serialised data needs to be entirely hold by driver memory. This 
> solution does not abide O(1) memory consumption, thus does not scale to 
> arbitrarily large dataset
>  # .toLocalIterator() fetch one partition in 1 job at a time, and fetching of 
> the next partition does not start until sequential access to previous 
> partition has concluded. This action abides O(1) memory consumption and is 
> great if access to data is sequential and significantly slower than the speed 
> where partitions can be shipped from a single executor, with 1 thread. It 
> becomes inefficient when the sequential access to data has to wait for a 
> relatively long time for the shipping of the next partition
> The proposed solution is a crossover between two existing implementations: a 
> concurrent subroutine that is both CPU and memory bounded. The solution 
> allocate a fixed sized resource pool (by default = number of available CPU 
> cores) that serves the shipping of partitions concurrently, and block 
> sequential access to partitions' data until shipping is finished (which 
> usually happens without blocking for partitionID >=2 due to the fact that 
> shipping start much earlier and preemptively). Tenants of the resource pool 
> can be GC'ed and evicted once sequential access to it's data has finished, 
> which allows more partitions to be fetched much earlier than they are 
> accessed. The maximum memory consumption is O(m * n), where m is the 
> predefined concurrency and n is the size of the largest partition.
> The following scala code snippet demonstrates a simple implementation:
>  
> (requires scala 2.11 + and ScalaTests)
>  
> {code:java}
> package org.apache.spark.spike
> import java.util.concurrent.ArrayBlockingQueue
> import org.apache.spark.rdd.RDD
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.{FutureAction, SparkContext}
> import org.scalatest.FunSpec
> import scala.concurrent.Future
> import scala.language.implicitConversions
> import scala.reflect.ClassTag
> import scala.util.{Failure, Success, Try}
> class ToLocalIteratorPreemptivelySpike extends FunSpec {
>   import ToLocalIteratorPreemptivelySpike._
>   lazy val sc: SparkContext = 
> SparkSession.builder().master("local[*]").getOrCreate().sparkContext
>   it("can be much faster than toLocalIterator") {
>     val max = 80
>     val delay = 100
>     val slowRDD = sc.parallelize(1 to max, 8).map { v =>
>       Thread.sleep(delay)
>       v
>     }
>     val (r1, t1) = timed {
>       slowRDD.toLocalIterator.toList
>     }
>     val capacity = 4
>     val (r2, t2) = timed {
>       slowRDD.toLocalIteratorPreemptively(capacity).toList
>     }
>     assert(r1 == r2)
>     println(s"linear: $t1, preemptive: $t2")
>     assert(t1 > t2 * 2)
>     assert(t2 > max * delay / capacity)
>   }
> }
> object ToLocalIteratorPreemptivelySpike {
>   case class PartitionExecution[T: ClassTag](
>       @transient self: RDD[T],
>       id: Int
>   ) {
>     def eager: this.type = {
>       AsArray.future
>       this
>     }
>     case object AsArray {
>       @transient lazy val future: FutureAction[Array[T]] = {
>         var result: Array[T] = null
>         val future = self.context.submitJob[T, Array[T], Array[T]](
>           self,
>           _.toArray,
>           Seq(id), { (_, data) =>
>             result = data
>           },
>           result
>         )
>         future
>       }
>       @transient lazy val now: Array[T] = future.get()
>     }
>   }
>   implicit class RDDFunctions[T: ClassTag](self: RDD[T]) {
>     import scala.concurrent.ExecutionContext.Implicits.global
>     def _toLocalIteratorPreemptively(capacity: Int): Iterator[Array[T]] = {
>       val executions = self.partitions.indices.map { ii =>
>         PartitionExecution(self, ii)
>       }
>       val buffer = new 
> ArrayBlockingQueue[Try[PartitionExecution[T]]](capacity)
>       Future {
>         executions.foreach { exe =>
>           buffer.put(Success(exe)) // may be blocking due to capacity
>           exe.eager // non-blocking
>         }
>       }.onFailure {
>         case e: Throwable =>
>           buffer.put(Failure(e))
>       }
>       self.partitions.indices.toIterator.map { _ =>
>         val exe = buffer.take().get
>         exe.AsArray.now
>       }
>     }
>     def toLocalIteratorPreemptively(capacity: Int): Iterator[T] = {
>       _toLocalIteratorPreemptively(capacity).flatten
>     }
>   }
>   def timed[T](fn: => T): (T, Long) = {
>     val startTime = System.currentTimeMillis()
>     val result = fn
>     val endTime = System.currentTimeMillis()
>     (result, endTime - startTime)
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to