This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f8ff786  [SPARK-37984][SHUFFLE] Avoid calculating all outstanding 
requests to improve performance
f8ff786 is described below

commit f8ff7863e792b833afb2ff603878f29d4a9888e6
Author: weixiuli <weixi...@jd.com>
AuthorDate: Sun Jan 23 20:23:20 2022 -0600

    [SPARK-37984][SHUFFLE] Avoid calculating all outstanding requests to 
improve performance
    
    ### What changes were proposed in this pull request?
    
    Avoid calculating all outstanding requests to improve performance.
    
    ### Why are the changes needed?
    
    Follow the comment 
(https://github.com/apache/spark/pull/34711#pullrequestreview-835520984) ,  we 
can implement a "has outstanding requests" method in the response handler that 
doesn't even need to get a count,let's do this with PR.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Exist unittests.
    
    Closes #35276 from weixiuli/SPARK-37984.
    
    Authored-by: weixiuli <weixi...@jd.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../apache/spark/network/client/TransportResponseHandler.java  | 10 ++++++++--
 .../apache/spark/network/server/TransportChannelHandler.java   |  3 +--
 2 files changed, 9 insertions(+), 4 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
index 576c088..261f205 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
@@ -140,7 +140,7 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
 
   @Override
   public void channelInactive() {
-    if (numOutstandingRequests() > 0) {
+    if (hasOutstandingRequests()) {
       String remoteAddress = getRemoteAddress(channel);
       logger.error("Still have {} requests outstanding when connection from {} 
is closed",
         numOutstandingRequests(), remoteAddress);
@@ -150,7 +150,7 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
 
   @Override
   public void exceptionCaught(Throwable cause) {
-    if (numOutstandingRequests() > 0) {
+    if (hasOutstandingRequests()) {
       String remoteAddress = getRemoteAddress(channel);
       logger.error("Still have {} requests outstanding when connection from {} 
is closed",
         numOutstandingRequests(), remoteAddress);
@@ -275,6 +275,12 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
       (streamActive ? 1 : 0);
   }
 
+  /** Check if there are any outstanding requests (fetch requests + rpcs) */
+  public Boolean hasOutstandingRequests() {
+    return streamActive || !outstandingFetches.isEmpty() || 
!outstandingRpcs.isEmpty() ||
+        !streamCallbacks.isEmpty();
+  }
+
   /** Returns the time in nanoseconds of when the last request was sent out. */
   public long getTimeOfLastRequestNs() {
     return timeOfLastRequestNs.get();
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
index 275e64e..d197032 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
@@ -161,8 +161,7 @@ public class TransportChannelHandler extends 
SimpleChannelInboundHandler<Message
         boolean isActuallyOverdue =
           System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > 
requestTimeoutNs;
         if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
-          boolean hasInFlightRequests = 
responseHandler.numOutstandingRequests() > 0;
-          if (hasInFlightRequests) {
+          if (responseHandler.hasOutstandingRequests()) {
             String address = getRemoteAddress(ctx.channel());
             logger.error("Connection to {} has been quiet for {} ms while 
there are outstanding " +
               "requests. Assuming connection is dead; please adjust" +

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to