Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r194957906 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleReader.scala --- @@ -68,7 +66,7 @@ private[shuffle] class UnsafeRowReceiver( } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case r: UnsafeRowReceiverMessage => + case r: RPCContinuousShuffleMessage => queues(r.writerId).put(r) --- End diff -- All RPC messages inside Spark are processed in [a shared fixed thread pool](https://github.com/apache/spark/blob/v2.3.1/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala#L197), hence we cannot run blocking calls inside a RPC thread. I think we need to design a backpressure mechanism in future fundamentally because a receiver cannot block a sender sending data. For example, even if we block here, we still cannot prevent the sender sending data and they will finally fulfill the TCP buffer. We cannot just count on TCP backpressure here as we need to use the same TCP connection in order to support thousands of machines.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org