Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190061592 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +62,46 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { - case ReceiverRow(r) => r - case ReceiverEpochMarker() => - finished = true - null + private val numWriterEpochMarkers = new AtomicInteger(0) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { + override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + override def getNext(): UnsafeRow = { + completion.take().get() match { --- End diff -- `take().get()` can block indefinitely if there happens to be issues like marker is never received from writer. Instead I suggest a small change. Put this in a loop (also suggested below) and use `poll(timeout)`. Every time it timesout, print the status of what / how many writers is this waiting for. This is be super help for debugging such issues, without adding much complexity. The timeout could be the epoch time interval, because expect markers to be received within that period.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org