Github user parthchandra commented on a diff in the pull request:
https://github.com/apache/drill/pull/950#discussion_r141498641
--- Diff:
exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java
---
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import com.google.protobuf.MessageLite;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.drill.common.exceptions.DrillException;
+import org.slf4j.Logger;
+
+import java.net.SocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @param <CC> Client Connection Listener
+ * @param <HS> Outbound handshake message type
+ * @param <HR> Inbound handshake message type
+ * @param <BC> BasicClient type
+ * <p>
+ * Implements a wrapper class that allows a client connection
to associate different behaviours after
+ * establishing a connection with the server. The client can
choose to send an application handshake, or
+ * in the case of SSL, wait for a SSL handshake completion and
then send an application handshake.
+ */
+
+public class ConnectionMultiListener<CC extends ClientConnection, HS
extends MessageLite, HR extends MessageLite, BC extends BasicClient> {
+
+ private static final Logger logger =
org.slf4j.LoggerFactory.getLogger(ConnectionMultiListener.class);
+
+ private final RpcConnectionHandler<CC> connectionListener;
+ private final HS handshakeValue;
+ private final BC parent;
+
+ private ConnectionMultiListener(RpcConnectionHandler<CC>
connectionListener, HS handshakeValue,
+ BC basicClient) {
+ assert connectionListener != null;
+ assert handshakeValue != null;
+
+ this.connectionListener = connectionListener;
+ this.handshakeValue = handshakeValue;
+ this.parent = basicClient;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <CC extends ClientConnection, HS extends MessageLite, BC
extends BasicClient> Builder
+ newBuilder(RpcConnectionHandler<CC> connectionListener, HS
handshakeValue,
+ BC basicClient) {
+ return new Builder(connectionListener, handshakeValue, basicClient);
+ }
+
+ public ConnectionHandler connectionHandler = null;
+ public HandshakeSendHandler handshakeSendHandler = null;
+ public SSLConnectionHandler sslConnectionHandler = null;
+
+ /**
+ * Manages connection establishment outcomes.
+ */
+ private class ConnectionHandler implements
GenericFutureListener<ChannelFuture> {
+
+ @Override public void operationComplete(ChannelFuture future) throws
Exception {
+ boolean isInterrupted = false;
+
+ // We want to wait for at least 120 secs when interrupts occur.
Establishing a connection fails/succeeds quickly,
+ // So there is no point propagating the interruption as failure
immediately.
+ long remainingWaitTimeMills = 120000;
+ long startTime = System.currentTimeMillis();
+ // logger.debug("Connection operation finished. Success: {}",
future.isSuccess());
+ while (true) {
+ try {
+ future.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS);
+ if (future.isSuccess()) {
+ SocketAddress remote = future.channel().remoteAddress();
+ SocketAddress local = future.channel().localAddress();
+ parent.setAddresses(remote, local);
+ // if SSL is enabled send the handshake after the ssl
handshake is completed, otherwise send it
+ // now
+ if(!parent.isSslEnabled()) {
+ // send a handshake on the current thread. This is the only
time we will send from within the event thread.
+ // We can do this because the connection will not be backed
up.
+ parent.send(handshakeSendHandler, handshakeValue, true);
+ }
+ } else {
+
connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION,
+ new RpcException("General connection failure."));
+ }
+ // logger.debug("Handshake queued for send.");
+ break;
+ } catch (final InterruptedException interruptEx) {
+ remainingWaitTimeMills -= (System.currentTimeMillis() -
startTime);
+ startTime = System.currentTimeMillis();
+ isInterrupted = true;
+ if (remainingWaitTimeMills < 1) {
+
connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION,
interruptEx);
+ break;
+ }
+ // Ignore the interrupt and continue to wait until we elapse
remainingWaitTimeMills.
+ } catch (final Exception ex) {
+ logger.error("Failed to establish connection", ex);
+
connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION,
ex);
+ break;
+ }
+ }
+
+ if (isInterrupted) {
+ // Preserve evidence that the interruption occurred so that code
higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private class SSLConnectionHandler implements
GenericFutureListener<Future<Channel>> {
+ @Override public void operationComplete(Future<Channel> future) throws
Exception {
+ // send the handshake
+ parent.send(handshakeSendHandler, handshakeValue, true);
+ }
+ }
+
+ /**
+ * manages handshake outcomes.
+ */
+ private class HandshakeSendHandler implements RpcOutcomeListener<HR> {
+
+ @Override public void failed(RpcException ex) {
+ logger.debug("Failure while initiating handshake", ex);
+
connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_COMMUNICATION,
ex);
+ }
+
+ @Override public void success(HR value, ByteBuf buffer) {
+ // logger.debug("Handshake received. {}", value);
+ try {
+ parent.validateHandshake(value);
+ parent.finalizeConnection(value, parent.connection);
+ connectionListener.connectionSucceeded((CC) parent.connection);
+ // logger.debug("Handshake completed succesfully.");
+ } catch (Exception ex) {
+ logger.debug("Failure while validating handshake", ex);
+
connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_VALIDATION,
ex);
+ }
+ }
+
+ @Override public void interrupted(final InterruptedException ex) {
+ logger.warn("Interrupted while waiting for handshake response", ex);
+
connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_COMMUNICATION,
ex);
+ }
+ }
+
+ /*
+ The SSL Handshake listener is special in that it is needed at the time
of initializing an SSL
+ enabled pipeline and so is instantiated before the instance of the
outer class may be needed.
+ We create an instance and set a reference back to the outer class
instance when it is created
+ at the time of connection.
+ */
+ public static class SSLHandshakeListener implements
GenericFutureListener<Future<Channel>> {
+ ConnectionMultiListener parent;
+ public SSLHandshakeListener() {
+ }
+
+ public void setParent(ConnectionMultiListener cml){
+ this.parent = cml;
+ }
+
+ @Override public void operationComplete(Future<Channel> future) throws
Exception {
+ if(parent != null){
+ if(future.isSuccess()) {
+ Channel c = future.get();
+ parent.sslConnectionHandler.operationComplete(future);
+ parent.parent.setSslChannel(c);
+ } else {
+ throw new DrillException("SSL handshake failed.",
future.cause());
+ }
+ } else {
+ throw new RpcException("RPC Setup error. SSL handshake complete
handler is not set up.");
+ }
+ return;
+ }
+ }
+
+
+ public static class Builder<CC extends ClientConnection, HS extends
MessageLite, HR extends MessageLite, BC extends BasicClient> {
+
+ private RpcConnectionHandler<CC> connectionListener;
+ private HS handshakeValue;
+ private BC basicClient;
+ private ConnectionMultiListener cml;
+
+ private Builder(RpcConnectionHandler<CC> connectionListener, HS
handshakeValue, BC basicClient) {
+ this.connectionListener = connectionListener;
+ this.handshakeValue = handshakeValue;
+ this.basicClient = basicClient;
+ this.cml = new ConnectionMultiListener(connectionListener,
handshakeValue, basicClient);
+ }
+
+ public Builder enableSSL() {
+ cml.connectionHandler = cml.new ConnectionHandler();
+ cml.sslConnectionHandler = cml.new SSLConnectionHandler();
+ return this;
+ }
+
+ public Builder enablePlain() {
+ cml.connectionHandler = cml.new ConnectionHandler();
+ return this;
+ }
+
+ public Builder enableHandshake() {
+ cml.handshakeSendHandler = cml.new HandshakeSendHandler();
+ return this;
+ }
+
+ public ConnectionMultiListener build() {
+ //always enable handshake
+ if (cml.handshakeSendHandler == null) {
+ enableHandshake();
+ }
+ if (cml.connectionHandler == null && cml.sslConnectionHandler ==
null) {
+ enablePlain();
+ }
+ return cml;
--- End diff --
Agree that I can defer creation of ConnectionMultiListener to the build()
method.
I thought that we should let the caller decide what handler it wants to
enable. However, I'm clearly overriding that in the build()method, so perhaps
you're right.
---