This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 064604a [SPARK-27073][CORE] Fix a race condition when handling of IdleStateEvent 064604a is described below commit 064604aaa768de4c33425249be9c73948b2aeac9 Author: sychen <syc...@ctrip.com> AuthorDate: Mon Mar 11 15:16:16 2019 -0700 [SPARK-27073][CORE] Fix a race condition when handling of IdleStateEvent ## What changes were proposed in this pull request? When TransportChannelHandler processes IdleStateEvent, it first calculates whether the last request time has timed out. At this time, TransportClient.sendRpc initiates a request. TransportChannelHandler gets responseHandler.numOutstandingRequests() > 0, causing the normal connection to be closed. ## How was this patch tested? Closes #23989 from cxzl25/fix_IdleStateEvent_timeout. Authored-by: sychen <syc...@ctrip.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../java/org/apache/spark/network/client/TransportResponseHandler.java | 2 +- .../java/org/apache/spark/network/server/TransportChannelHandler.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java index 596b0ea..2f143f7 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java @@ -91,7 +91,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { } public void addStreamCallback(String streamId, StreamCallback callback) { - timeOfLastRequestNs.set(System.nanoTime()); + updateTimeOfLastRequest(); streamCallbacks.offer(ImmutablePair.of(streamId, callback)); } diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java index ca81099..31371f6 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java @@ -155,10 +155,11 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message // To avoid a race between TransportClientFactory.createClient() and this code which could // result in an inactive client being returned, this needs to run in a synchronized block. synchronized (this) { + boolean hasInFlightRequests = responseHandler.numOutstandingRequests() > 0; boolean isActuallyOverdue = System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs; if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) { - if (responseHandler.numOutstandingRequests() > 0) { + if (hasInFlightRequests) { String address = getRemoteAddress(ctx.channel()); logger.error("Connection to {} has been quiet for {} ms while there are outstanding " + "requests. Assuming connection is dead; please adjust spark.network.timeout if " + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org