This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 004cd012e1 HttpClient: Include error handler on all connection 
attempts. (#14915)
004cd012e1 is described below

commit 004cd012e1cf94f8bb948d7c350f8c06df44af72
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Aug 29 01:58:04 2023 -0700

    HttpClient: Include error handler on all connection attempts. (#14915)
    
    Currently we have an error handler for https connection attempts, but
    not for plaintext connection attempts. This leads to warnings like the
    following for plaintext connection errors:
    
      EXCEPTION, please implement 
org.jboss.netty.handler.codec.http.HttpContentDecompressor.exceptionCaught() 
for proper handling.
    
    This happens because if we don't add our own error handler, the last
    handler in the chain during a connection attempt is HttpContentDecompressor,
    which doesn't handle errors.
    
    The new error handler for plaintext doesn't do much: it just closes
    the channel.
---
 .../http/client/pool/ChannelResourceFactory.java   | 87 ++++++++++++++++------
 .../java/util/http/client/JankyServersTest.java    | 72 ++++++++++++++++++
 2 files changed, 135 insertions(+), 24 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java
 
b/processing/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java
index d6465be305..c679894588 100644
--- 
a/processing/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java
@@ -44,6 +44,7 @@ import org.jboss.netty.handler.codec.http.HttpVersion;
 import org.jboss.netty.handler.ssl.SslHandler;
 import org.jboss.netty.util.Timer;
 
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLParameters;
@@ -53,13 +54,15 @@ import java.net.URL;
 import java.util.concurrent.TimeUnit;
 
 /**
+ *
  */
 public class ChannelResourceFactory implements ResourceFactory<String, 
ChannelFuture>
 {
   private static final Logger log = new Logger(ChannelResourceFactory.class);
 
   private static final long DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS = 
TimeUnit.SECONDS.toMillis(10);
-  private static final String DRUID_PROXY_HANDLER = "druid_proxyHandler";
+  private static final String PROXY_HANDLER_NAME = "druid-proxy";
+  private static final String ERROR_HANDLER_NAME = "druid-connection-error";
 
   private final ClientBootstrap bootstrap;
   private final SSLContext sslContext;
@@ -128,7 +131,7 @@ public class ChannelResourceFactory implements 
ResourceFactory<String, ChannelFu
           if (f1.isSuccess()) {
             final Channel channel = f1.getChannel();
             channel.getPipeline().addLast(
-                DRUID_PROXY_HANDLER,
+                PROXY_HANDLER_NAME,
                 new SimpleChannelUpstreamHandler()
                 {
                   @Override
@@ -137,7 +140,7 @@ public class ChannelResourceFactory implements 
ResourceFactory<String, ChannelFu
                     Object msg = e.getMessage();
 
                     final ChannelPipeline pipeline = ctx.getPipeline();
-                    pipeline.remove(DRUID_PROXY_HANDLER);
+                    pipeline.remove(PROXY_HANDLER_NAME);
 
                     if (msg instanceof HttpResponse) {
                       HttpResponse httpResponse = (HttpResponse) msg;
@@ -217,27 +220,7 @@ public class ChannelResourceFactory implements 
ResourceFactory<String, ChannelFu
       sslHandler.setCloseOnSSLException(true);
 
       final ChannelFuture handshakeFuture = 
Channels.future(connectFuture.getChannel());
-      connectFuture.getChannel().getPipeline().addLast(
-          "connectionErrorHandler", new SimpleChannelUpstreamHandler()
-          {
-            @Override
-            public void exceptionCaught(ChannelHandlerContext ctx, 
ExceptionEvent e)
-            {
-              final Channel channel = ctx.getChannel();
-              if (channel == null) {
-                // For the case where this pipeline is not attached yet.
-                handshakeFuture.setFailure(new ChannelException(
-                    StringUtils.format("Channel is null. The context name is 
[%s]", ctx.getName())
-                ));
-                return;
-              }
-              handshakeFuture.setFailure(e.getCause());
-              if (channel.isOpen()) {
-                channel.close();
-              }
-            }
-          }
-      );
+      connectFuture.getChannel().getPipeline().addLast(ERROR_HANDLER_NAME, new 
ConnectionErrorHandler(handshakeFuture));
       connectFuture.addListener(
           new ChannelFutureListener()
           {
@@ -280,6 +263,7 @@ public class ChannelResourceFactory implements 
ResourceFactory<String, ChannelFu
 
       retVal = handshakeFuture;
     } else {
+      connectFuture.getChannel().getPipeline().addLast(ERROR_HANDLER_NAME, new 
ConnectionErrorHandler(null));
       retVal = connectFuture;
     }
 
@@ -308,4 +292,59 @@ public class ChannelResourceFactory implements 
ResourceFactory<String, ChannelFu
     log.trace("Closing");
     resource.awaitUninterruptibly().getChannel().close();
   }
+
+  /**
+   * Handler that captures errors that occur while connecting. Typically 
superseded by other handlers after
+   * a connection happens, in {@link 
org.apache.druid.java.util.http.client.NettyHttpClient}.
+   *
+   * It's important to have this for all channels, even if {@link #future} is 
null, because otherwise exceptions
+   * that occur during connection land at {@link 
org.jboss.netty.handler.codec.http.HttpContentDecompressor} (the last
+   * handler from {@link 
org.apache.druid.java.util.http.client.netty.HttpClientPipelineFactory}) and 
are dropped on
+   * the floor along with a scary-looking warning like "EXCEPTION, please 
implement
+   * 
org.jboss.netty.handler.codec.http.HttpContentDecompressor.exceptionCaught() 
for proper handling."
+   */
+  private static class ConnectionErrorHandler extends 
SimpleChannelUpstreamHandler
+  {
+    @Nullable
+    private final ChannelFuture future;
+
+    /**
+     * Constructor.
+     *
+     * @param future future to attach errors to
+     */
+    public ConnectionErrorHandler(@Nullable ChannelFuture future)
+    {
+      this.future = future;
+    }
+
+    @Override
+    public void exceptionCaught(final ChannelHandlerContext ctx, final 
ExceptionEvent e)
+    {
+      final Channel channel = ctx.getChannel();
+      if (channel == null) {
+        // For the case where this pipeline is not attached yet.
+        if (future != null && !future.isDone()) {
+          final ChannelException e2 =
+              new ChannelException(StringUtils.format("Channel is null. The 
context name is [%s]", ctx.getName()));
+          e2.addSuppressed(e.getCause());
+          future.setFailure(e2);
+        }
+        return;
+      }
+
+      if (future != null && !future.isDone()) {
+        future.setFailure(e.getCause());
+      }
+
+      // Close the channel if this is the last handler. Otherwise, we expect 
that NettyHttpClient would have added
+      // additional handlers to take care of the errors.
+      //noinspection ObjectEquality
+      if (channel.isOpen() && this == ctx.getPipeline().getLast()) {
+        channel.close();
+      }
+
+      ctx.sendUpstream(e);
+    }
+  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java
index 3d12cf7a1f..ec54bd1350 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java
@@ -296,6 +296,78 @@ public class JankyServersTest
     }
   }
 
+  @Test
+  public void testHttpConnectionRefused() throws Throwable
+  {
+    final Lifecycle lifecycle = new Lifecycle();
+    try {
+      final HttpClientConfig config = 
HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build();
+      final HttpClient client = HttpClientInit.createClient(config, lifecycle);
+
+      // Need to select a port that isn't being listened to. This approach 
finds an unused port in a racey way.
+      // Hopefully it works most of the time.
+      final ServerSocket sock = new ServerSocket(0);
+      final int port = sock.getLocalPort();
+      sock.close();
+
+      final ListenableFuture<StatusResponseHolder> response = client
+          .go(
+              new Request(HttpMethod.GET, new 
URL(StringUtils.format("http://localhost:%d/";, port))),
+              StatusResponseHandler.getInstance()
+          );
+
+      Throwable e = null;
+      try {
+        response.get();
+      }
+      catch (ExecutionException e1) {
+        e = e1.getCause();
+        e1.printStackTrace();
+      }
+
+      Assert.assertTrue("ChannelException thrown by 'get'", 
isChannelClosedException(e));
+    }
+    finally {
+      lifecycle.stop();
+    }
+  }
+
+  @Test
+  public void testHttpsConnectionRefused() throws Throwable
+  {
+    final Lifecycle lifecycle = new Lifecycle();
+    try {
+      final HttpClientConfig config = 
HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build();
+      final HttpClient client = HttpClientInit.createClient(config, lifecycle);
+
+      // Need to select a port that isn't being listened to. This approach 
finds an unused port in a racey way.
+      // Hopefully it works most of the time.
+      final ServerSocket sock = new ServerSocket(0);
+      final int port = sock.getLocalPort();
+      sock.close();
+
+      final ListenableFuture<StatusResponseHolder> response = client
+          .go(
+              new Request(HttpMethod.GET, new 
URL(StringUtils.format("https://localhost:%d/";, port))),
+              StatusResponseHandler.getInstance()
+          );
+
+      Throwable e = null;
+      try {
+        response.get();
+      }
+      catch (ExecutionException e1) {
+        e = e1.getCause();
+        e1.printStackTrace();
+      }
+
+      Assert.assertTrue("ChannelException thrown by 'get'", 
isChannelClosedException(e));
+    }
+    finally {
+      lifecycle.stop();
+    }
+  }
+
   public boolean isChannelClosedException(Throwable e)
   {
     return e instanceof ChannelException ||


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to