HIVE-16692 addendum. LLAP: Keep alive connection in shuffle handler should not 
be closed until entire data is flushed out. (Rajesh Balamohan, reviewed by 
Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3671668b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3671668b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3671668b

Branch: refs/heads/hive-14535
Commit: 3671668b667be4fd2261ecac2e5bd66c5d282cf9
Parents: 497f119
Author: Siddharth Seth <[email protected]>
Authored: Thu May 18 12:06:01 2017 -0700
Committer: Siddharth Seth <[email protected]>
Committed: Thu May 18 12:06:01 2017 -0700

----------------------------------------------------------------------
 .../hive/llap/shufflehandler/ShuffleHandler.java      | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3671668b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
index f63375c..e90f0df 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
@@ -797,8 +797,18 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
       if (!keepAliveParam && !connectionKeepAliveEnabled) {
         lastMap.addListener(ChannelFutureListener.CLOSE);
       } else {
-        // Entire response is written out. Safe to enable timeout handling.
-        timeoutHandler.setEnabledTimeout(true);
+        lastMap.addListener(new ChannelFutureListener() {
+          @Override
+          public void operationComplete(ChannelFuture future) throws Exception 
{
+            if (!future.isSuccess()) {
+              // On error close the channel.
+              future.getChannel().close();
+              return;
+            }
+            // Entire response is written out. Safe to enable timeout handling.
+            timeoutHandler.setEnabledTimeout(true);
+          }
+        });
       }
     }
 

Reply via email to