HIVE-16692 addendum. LLAP: Keep alive connection in shuffle handler should not be closed until entire data is flushed out. (Rajesh Balamohan, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3671668b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3671668b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3671668b Branch: refs/heads/hive-14535 Commit: 3671668b667be4fd2261ecac2e5bd66c5d282cf9 Parents: 497f119 Author: Siddharth Seth <[email protected]> Authored: Thu May 18 12:06:01 2017 -0700 Committer: Siddharth Seth <[email protected]> Committed: Thu May 18 12:06:01 2017 -0700 ---------------------------------------------------------------------- .../hive/llap/shufflehandler/ShuffleHandler.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/3671668b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java index f63375c..e90f0df 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java @@ -797,8 +797,18 @@ public class ShuffleHandler implements AttemptRegistrationListener { if (!keepAliveParam && !connectionKeepAliveEnabled) { lastMap.addListener(ChannelFutureListener.CLOSE); } else { - // Entire response is written out. Safe to enable timeout handling. - timeoutHandler.setEnabledTimeout(true); + lastMap.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + // On error close the channel. + future.getChannel().close(); + return; + } + // Entire response is written out. Safe to enable timeout handling. + timeoutHandler.setEnabledTimeout(true); + } + }); } }
