pgaref commented on a change in pull request #1778:
URL: https://github.com/apache/hive/pull/1778#discussion_r612289762



##########
File path: 
llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/FadvisedFileRegion.java
##########
@@ -71,15 +72,39 @@ public long transferTo(WritableByteChannel target, long 
position)
       throws IOException {
     if (manageOsCache && readaheadPool != null) {
       readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
-          getPosition() + position, readaheadLength,
-          getPosition() + getCount(), readaheadRequest);
+          position() + position, readaheadLength,
+          position() + count(), readaheadRequest);
     }
-    
+    long written = 0;

Review comment:
       Shall we simplify this to:
   
   ```
   transferred = true;
   if (this.shuffleTransferToAllowed) {
         return super.transferTo(target, position);
   }
   return  customShuffleTransfer(target, position);
   ```
     
   
   

##########
File path: 
llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
##########
@@ -339,27 +350,60 @@ private ShuffleHandler(Configuration conf) {
 
 
   public void start() throws Exception {
-    ServerBootstrap bootstrap = new ServerBootstrap(selector);
-    // Timer is shared across entire factory and must be released separately
-    timer = new HashedWheelTimer();
-    try {
-      pipelineFact = new HttpPipelineFactory(conf, timer);
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-    bootstrap.setPipelineFactory(pipelineFact);
-    bootstrap.setOption("backlog", NetUtil.SOMAXCONN);
+    ServerBootstrap bootstrap = new ServerBootstrap()
+        .channel(NioServerSocketChannel.class)
+        .group(bossGroup, workerGroup)
+        .localAddress(port)
+        .option(ChannelOption.SO_BACKLOG, NetUtil.SOMAXCONN)
+        .childOption(ChannelOption.SO_KEEPALIVE, true);
+    initPipeline(bootstrap, conf);
+
     port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
-    Channel ch = bootstrap.bind(new InetSocketAddress(port));
+    Channel ch = bootstrap.bind().sync().channel();
     accepted.add(ch);
-    port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+    port = ((InetSocketAddress)ch.localAddress()).getPort();
     conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
-    pipelineFact.SHUFFLE.setPort(port);
+    SHUFFLE.setPort(port);
     if (dirWatcher != null) {
       dirWatcher.start();
     }
-    LOG.info("LlapShuffleHandler" + " listening on port " + port + " 
(SOMAXCONN: " + bootstrap.getOption("backlog")
-      + ")");
+    LOG.info("LlapShuffleHandler listening on port {} (SOMAXCONN: {})", port, 
NetUtil.SOMAXCONN);
+  }
+
+  private void initPipeline(ServerBootstrap bootstrap, Configuration conf) 
throws Exception {
+    SHUFFLE = getShuffle(conf);
+    // TODO Setup SSL Shuffle

Review comment:
       I know this is copy pasted from below but do we have a ticket for this?
   Is it still needed?

##########
File path: 
llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
##########
@@ -1031,25 +1038,14 @@ protected ChannelFuture 
sendMapOutput(ChannelHandlerContext ctx, Channel ch,
             info.getStartOffset(), info.getPartLength(), manageOsCache, 
readaheadLength,
             readaheadPool, spillfile.getAbsolutePath(), 
             shuffleBufferSize, shuffleTransferToAllowed, 
canEvictAfterTransfer);
-        writeFuture = ch.write(partition);
-        writeFuture.addListener(new ChannelFutureListener() {
-            // TODO error handling; distinguish IO/connection failures,
-            //      attribute to appropriate spill output
-          @Override
-          public void operationComplete(ChannelFuture future) {
-            if (future.isSuccess()) {
-              partition.transferSuccessful();
-            }
-            partition.releaseExternalResources();
-          }
-        });
+        writeFuture = ch.writeAndFlush(partition);

Review comment:
       This looks much cleaner with deallocate() call replacing completion 
Listeners

##########
File path: 
llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/FadvisedFileRegion.java
##########
@@ -124,39 +149,33 @@ long customShuffleTransfer(WritableByteChannel target, 
long position)
         position += trans; 
         trans = 0;
       }
-      
+
       //write data to the target
       while(byteBuffer.hasRemaining()) {
         target.write(byteBuffer);
       }
       
       byteBuffer.clear();
     }
-    
+
     return actualCount - trans;
   }
 
-  
-  @Override
-  public void releaseExternalResources() {
-    if (readaheadRequest != null) {
-      readaheadRequest.cancel();
-    }
-    super.releaseExternalResources();
-  }
-  
   /**
    * Call when the transfer completes successfully so we can advise the OS that
    * we don't need the region to be cached anymore.
    */
   public void transferSuccessful() {
-    if (manageOsCache && getCount() > 0) {
+    if (manageOsCache && count() > 0) {
       try {
         if (canEvictAfterTransfer) {
-          LOG.debug("shuffleBufferSize: {}, path: {}", shuffleBufferSize, 
identifier);
-          
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
-              fd, getPosition(), getCount(),
-              NativeIO.POSIX.POSIX_FADV_DONTNEED);
+          if (fd.valid()) {

Review comment:
       Shall we keep the original log message for advancing the file descriptor 
as well here?
   Why do we need the extra fd.valid check here? (maybe leave a comment?)

##########
File path: 
llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
##########
@@ -797,16 +803,17 @@ public void messageReceived(ChannelHandlerContext ctx, 
MessageEvent evt)
 
       Map<String, MapOutputInfo> mapOutputInfoMap =
           new HashMap<String, MapOutputInfo>();
-      Channel ch = evt.getChannel();
-
+      Channel ch = ctx.channel();
       // In case of KeepAlive, ensure that timeout handler does not close 
connection until entire
       // response is written (i.e, response headers + mapOutput).
-      ChannelPipeline pipeline = ch.getPipeline();
+      ChannelPipeline pipeline = ch.pipeline();
       TimeoutHandler timeoutHandler = 
(TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
       timeoutHandler.setEnabledTimeout(false);
 
       String user = userRsrc.get(jobId);
-
+      if (keepAliveParam || connectionKeepAliveEnabled){

Review comment:
       Why is this need now? How was connection kept alive before ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to