Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21560#discussion_r196999896
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
    @@ -61,12 +63,14 @@ class ContinuousShuffleReadRDD(
         numPartitions: Int,
         queueSize: Int = 1024,
         numShuffleWriters: Int = 1,
    -    epochIntervalMs: Long = 1000)
    +    epochIntervalMs: Long = 1000,
    +    val endpointNames: Seq[String] = 
Seq(s"RPCContinuousShuffleReader-${UUID.randomUUID()}"))
       extends RDD[UnsafeRow](sc, Nil) {
     
       override protected def getPartitions: Array[Partition] = {
         (0 until numPartitions).map { partIndex =>
    -      ContinuousShuffleReadPartition(partIndex, queueSize, 
numShuffleWriters, epochIntervalMs)
    +      ContinuousShuffleReadPartition(
    +        partIndex, endpointNames(partIndex), queueSize, numShuffleWriters, 
epochIntervalMs)
    --- End diff --
    
    This effectively asserting numPartitions to be 1, otherwise it will throw 
exception.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to