Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21385#discussion_r190061592
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
    @@ -56,20 +62,46 @@ private[shuffle] class UnsafeRowReceiver(
     
       override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
         case r: UnsafeRowReceiverMessage =>
    -      queue.put(r)
    +      queues(r.writerId).put(r)
           context.reply(())
       }
     
       override def read(): Iterator[UnsafeRow] = {
         new NextIterator[UnsafeRow] {
    -      override def getNext(): UnsafeRow = queue.take() match {
    -        case ReceiverRow(r) => r
    -        case ReceiverEpochMarker() =>
    -          finished = true
    -          null
    +      private val numWriterEpochMarkers = new AtomicInteger(0)
    +
    +      private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
    +      private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
    +
    +      private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
    +        override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
           }
     
    -      override def close(): Unit = {}
    +      (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
    +
    +      override def getNext(): UnsafeRow = {
    +        completion.take().get() match {
    --- End diff --
    
    `take().get()` can block indefinitely if there happens to be issues like 
marker is never received from writer. Instead I suggest a small change. Put 
this in a loop (also suggested below) and use `poll(timeout)`. Every time it 
timesout, print the status of what / how many writers is this waiting for. This 
is be super help for debugging such issues, without adding much complexity. The 
timeout could be the epoch time interval, because expect markers to be received 
within that period. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to