Github user laurentgo commented on a diff in the pull request:
https://github.com/apache/drill/pull/578#discussion_r102344639
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java
---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import org.apache.drill.exec.proto.UserBitShared.SaslMessage;
+import org.apache.drill.exec.proto.UserBitShared.SaslStatus;
+import org.apache.drill.exec.rpc.RequestHandler;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.ResponseSender;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.ServerConnection;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.util.EnumMap;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class ServerAuthenticationHandler<C extends ServerConnection, T
extends EnumLite> implements RequestHandler<C> {
+ private static final org.slf4j.Logger logger =
+ org.slf4j.LoggerFactory.getLogger(ServerAuthenticationHandler.class);
+
+ private static final ImmutableMap<SaslStatus, SaslResponseProcessor>
RESPONSE_PROCESSORS;
+
+ static {
+ final Map<SaslStatus, SaslResponseProcessor> map = new
EnumMap<>(SaslStatus.class);
+ map.put(SaslStatus.SASL_START, new SaslStartProcessor());
+ map.put(SaslStatus.SASL_IN_PROGRESS, new SaslInProgressProcessor());
+ map.put(SaslStatus.SASL_SUCCESS, new SaslSuccessProcessor());
+ map.put(SaslStatus.SASL_FAILED, new SaslFailedProcessor());
+ RESPONSE_PROCESSORS = Maps.immutableEnumMap(map);
+ }
+
+ private final RequestHandler<C> requestHandler;
+ private final int saslRequestTypeValue;
+ private final T saslResponseType;
+
+ public ServerAuthenticationHandler(final RequestHandler<C>
requestHandler, final int saslRequestTypeValue,
+ final T saslResponseType) {
+ this.requestHandler = requestHandler;
+ this.saslRequestTypeValue = saslRequestTypeValue;
+ this.saslResponseType = saslResponseType;
+ }
+
+ @Override
+ public void handle(C connection, int rpcType, ByteBuf pBody, ByteBuf
dBody, ResponseSender sender)
+ throws RpcException {
+ final String remoteAddress = connection.getRemoteAddress().toString();
+
+ // exchange involves server "challenges" and client "responses"
(initiated by client)
+ if (saslRequestTypeValue == rpcType) {
+ final SaslMessage saslResponse;
+ try {
+ saslResponse = SaslMessage.PARSER.parseFrom(new
ByteBufInputStream(pBody));
+ } catch (final InvalidProtocolBufferException e) {
+ handleAuthFailure(connection, remoteAddress, sender, e,
saslResponseType);
+ return;
+ }
+
+ logger.trace("Received SASL message {} from {}",
saslResponse.getStatus(), remoteAddress);
+ final SaslResponseProcessor processor =
RESPONSE_PROCESSORS.get(saslResponse.getStatus());
+ if (processor == null) {
+ logger.info("Unknown message type from client from {}. Will stop
authentication.", remoteAddress);
+ handleAuthFailure(connection, remoteAddress, sender, new
SaslException("Received unexpected message"),
+ saslResponseType);
+ return;
+ }
+
+ final SaslResponseContext<C, T> context = new
SaslResponseContext<>(saslResponse, connection, remoteAddress,
+ sender, requestHandler, saslResponseType);
+ try {
+ processor.process(context);
+ } catch (final Exception e) {
+ handleAuthFailure(connection, remoteAddress, sender, e,
saslResponseType);
+ }
+ } else {
+
+ // this handler only handles messages of SASL_MESSAGE_VALUE type
+
+ // drop connection
+ connection.close();
+
+ // the response type for this request type is likely known from
UserRpcConfig,
+ // but the client should not be making any requests before
authenticating.
+ throw new UnsupportedOperationException(
+ String.format("Request of type %d is not allowed without
authentication. " +
+ "Client on %s must authenticate before making requests.
Connection dropped.",
+ rpcType, remoteAddress));
+ }
+ }
+
+ private static class SaslResponseContext<C extends ServerConnection, T
extends EnumLite> {
+
+ final SaslMessage saslResponse;
+ final C connection;
+ final String remoteAddress;
+ final ResponseSender sender;
+ final RequestHandler<C> requestHandler;
+ final T saslResponseType;
+
+ SaslResponseContext(SaslMessage saslResponse, C connection, String
remoteAddress, ResponseSender sender,
+ RequestHandler<C> requestHandler, T
saslResponseType) {
+ this.saslResponse = checkNotNull(saslResponse);
+ this.connection = checkNotNull(connection);
+ this.remoteAddress = checkNotNull(remoteAddress);
+ this.sender = checkNotNull(sender);
+ this.requestHandler = checkNotNull(requestHandler);
+ this.saslResponseType = checkNotNull(saslResponseType);
+ }
+ }
+
+ private interface SaslResponseProcessor {
+
+ /**
+ * Process response from client, and if there are no exceptions, send
response using
+ * {@link SaslResponseContext#sender}. Otherwise, throw the exception.
+ *
+ * @param context response context
+ */
+ void process(SaslResponseContext context) throws Exception;
+
+ }
+
+ private static class SaslStartProcessor implements SaslResponseProcessor
{
+
+ @Override
+ public void process(final SaslResponseContext context) throws
Exception {
+
context.connection.initSaslServer(context.saslResponse.getMechanism());
+
+ // assume #evaluateResponse must be called at least once
+
RESPONSE_PROCESSORS.get(SaslStatus.SASL_IN_PROGRESS).process(context);
+ }
+ }
+
+ private static class SaslInProgressProcessor implements
SaslResponseProcessor {
+
+ @Override
+ public void process(final SaslResponseContext context) throws
Exception {
+ final SaslMessage.Builder challenge = SaslMessage.newBuilder();
+ final SaslServer saslServer = context.connection.getSaslServer();
+
+ final byte[] challengeBytes = evaluateResponse(saslServer,
context.saslResponse.getData().toByteArray());
+
+ if (saslServer.isComplete()) {
+ challenge.setStatus(SaslStatus.SASL_SUCCESS);
+ if (challengeBytes != null) {
+ challenge.setData(ByteString.copyFrom(challengeBytes));
+ }
+
+ handleSuccess(context, challenge, saslServer);
+ } else {
+ challenge.setStatus(SaslStatus.SASL_IN_PROGRESS)
+ .setData(ByteString.copyFrom(challengeBytes));
+ context.sender.send(new Response(context.saslResponseType,
challenge.build()));
+ }
+ }
+ }
+
+ // only when client succeeds first
+ private static class SaslSuccessProcessor implements
SaslResponseProcessor {
+
+ @Override
+ public void process(final SaslResponseContext context) throws
Exception {
+ // at this point, #isComplete must be false; so try once, fail
otherwise
+ final SaslServer saslServer = context.connection.getSaslServer();
+
+ evaluateResponse(saslServer,
context.saslResponse.getData().toByteArray()); // discard challenge
+
+ if (saslServer.isComplete()) {
+ final SaslMessage.Builder challenge = SaslMessage.newBuilder();
+ challenge.setStatus(SaslStatus.SASL_SUCCESS);
+
+ handleSuccess(context, challenge, saslServer);
+ } else {
+ logger.info("Failed to authenticate client from {}",
context.remoteAddress);
+ throw new SaslException("Client allegedly succeeded
authentication, but server did not. Suspicious?");
+ }
+ }
+ }
+
+ private static class SaslFailedProcessor implements
SaslResponseProcessor {
+
+ @Override
+ public void process(final SaslResponseContext context) throws
Exception {
+ logger.info("Client from {} failed authentication graciously, and
does not want to continue.",
+ context.remoteAddress);
+ throw new SaslException("Client graciously failed authentication");
+ }
+ }
+
+ private static byte[] evaluateResponse(final SaslServer saslServer,
+ final byte[] responseBytes)
throws SaslException {
+ try {
+ return UserGroupInformation.getLoginUser().doAs(new
PrivilegedExceptionAction<byte[]>() {
+ @Override
+ public byte[] run() throws Exception {
+ return saslServer.evaluateResponse(responseBytes);
+ }
+ });
+ } catch (final UndeclaredThrowableException e) {
+ throw new SaslException(String.format("Unexpected failure trying to
authenticate using %s",
+ saslServer.getMechanismName()), e.getCause());
+ } catch (final IOException | InterruptedException e) {
+ if (e instanceof SaslException) {
+ throw (SaslException) e;
+ } else {
+ throw new SaslException(String.format("Unexpected failure trying
to authenticate using %s",
+ saslServer.getMechanismName()), e);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void handleSuccess(final SaslResponseContext context,
final SaslMessage.Builder challenge,
+ final SaslServer saslServer) throws
IOException {
+ context.connection.changeHandlerTo(context.requestHandler);
+ context.connection.finalizeSaslSession();
+ context.sender.send(new Response(context.saslResponseType,
challenge.build()));
+
+ // setup security layers here..
--- End diff --
we should probably dispose of the saslServer (`saslServer#dispose()`)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---