Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21385#discussion_r190059433
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
    @@ -56,20 +62,46 @@ private[shuffle] class UnsafeRowReceiver(
     
       override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
         case r: UnsafeRowReceiverMessage =>
    -      queue.put(r)
    +      queues(r.writerId).put(r)
           context.reply(())
       }
     
       override def read(): Iterator[UnsafeRow] = {
         new NextIterator[UnsafeRow] {
    -      override def getNext(): UnsafeRow = queue.take() match {
    -        case ReceiverRow(r) => r
    -        case ReceiverEpochMarker() =>
    -          finished = true
    -          null
    +      private val numWriterEpochMarkers = new AtomicInteger(0)
    +
    +      private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
    +      private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
    +
    +      private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
    +        override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
           }
     
    -      override def close(): Unit = {}
    +      (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
    +
    +      override def getNext(): UnsafeRow = {
    +        completion.take().get() match {
    +          case ReceiverRow(writerId, r) =>
    +            // Start reading the next element in the queue we just took 
from.
    +            completion.submit(completionTask(writerId))
    +            r
    +            // TODO use writerId
    --- End diff --
    
    what is this todo for?



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to