[ https://issues.apache.org/jira/browse/DRILL-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15625915#comment-15625915 ]
ASF GitHub Bot commented on DRILL-4280: --------------------------------------- Github user sudheeshkatkam commented on a diff in the pull request: https://github.com/apache/drill/pull/578#discussion_r85853323 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java --- @@ -78,21 +101,241 @@ public void submitQuery(UserResultsListener resultsListener, RunQuery query) { send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class); } - public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint, - UserProperties props, UserBitShared.UserCredentials credentials) { + public CheckedFuture<Void, RpcException> connect(DrillbitEndpoint endpoint, ConnectionParameters parameters, + UserCredentials credentials) { + final FutureHandler handler = new FutureHandler(); UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder() .setRpcVersion(UserRpcConfig.RPC_VERSION) .setSupportListening(true) .setSupportComplexTypes(supportComplexTypes) .setSupportTimeout(true) - .setCredentials(credentials); + .setCredentials(credentials) + .setProperties(parameters.serializeForServer()); + this.parameters = parameters; + + connectAsClient(queryResultHandler.getWrappedConnectionHandler(handler), + hsBuilder.build(), endpoint.getAddress(), endpoint.getUserPort()); + return handler; + } + + /** + * Check (after {@link #connect connecting}) if server requires authentication. + * + * @return true if server requires authentication + */ + public boolean serverRequiresAuthentication() { + return supportedAuthMechs != null; + } + + /** + * Returns a list of supported authentication mechanism. If called before {@link #connect connecting}, + * returns null. If called after {@link #connect connecting}, returns a list of supported mechanisms + * iff authentication is required. + * + * @return list of supported authentication mechanisms + */ + public List<String> getSupportedAuthenticationMechanisms() { + return supportedAuthMechs; + } + + /** + * Authenticate to the server asynchronously. Returns a future that {@link CheckedFuture#checkedGet results} + * in null if authentication succeeds, or throws a {@link SaslException} with relevant message if + * authentication fails. + * + * This method uses parameters provided at {@link #connect connection time} and override them with the + * given parameters, if any. + * + * @param overrides parameter overrides + * @return result of authentication request + */ + public CheckedFuture<Void, SaslException> authenticate(final ConnectionParameters overrides) { + if (supportedAuthMechs == null) { + throw new IllegalStateException("Server does not require authentication."); + } + parameters.merge(overrides); + + final SettableFuture<Void> settableFuture = SettableFuture.create(); // future used in SASL exchange + final CheckedFuture<Void, SaslException> future = + new AbstractCheckedFuture<Void, SaslException>(settableFuture) { + + @Override + protected SaslException mapException(Exception e) { + if (connection != null) { + connection.close(); // to ensure connection is dropped + } + if (e instanceof ExecutionException) { + final Throwable cause = e.getCause(); + if (cause instanceof SaslException) { + return new SaslException("Authentication failed: " + cause.getMessage(), cause); + } + } + return new SaslException("Authentication failed unexpectedly.", e); + } + }; - if (props != null) { - hsBuilder.setProperties(props); + final ClientAuthenticationProvider authenticationProvider; + try { + authenticationProvider = + UserAuthenticationUtil.getClientAuthenticationProvider(parameters, supportedAuthMechs); + } catch (final SaslException e) { + settableFuture.setException(e); + return future; } - this.connectAsClient(queryResultHandler.getWrappedConnectionHandler(handler), - hsBuilder.build(), endpoint.getAddress(), endpoint.getUserPort()); + final String providerName = authenticationProvider.name(); + logger.trace("Will try to login for {} mechanism.", providerName); + final UserGroupInformation ugi; + try { + ugi = authenticationProvider.login(parameters); + } catch (final SaslException e) { + settableFuture.setException(e); + return future; + } + + logger.trace("Will try to authenticate to server using {} mechanism.", providerName); + try { + saslClient = authenticationProvider.createSaslClient(ugi, parameters); + } catch (final SaslException e) { + settableFuture.setException(e); + return future; + } + + if (saslClient == null) { + settableFuture.setException(new SaslException("Cannot initiate authentication. Insufficient credentials?")); + return future; + } + logger.trace("Initiating SASL exchange."); + + try { + final ByteString responseData; + if (saslClient.hasInitialResponse()) { + responseData = ByteString.copyFrom(evaluateChallenge(ugi, saslClient, new byte[0])); + } else { + responseData = ByteString.EMPTY; + } + send(new SaslChallengeHandler(ugi, settableFuture), + RpcType.SASL_MESSAGE, + SaslMessage.newBuilder() + .setMechanism(providerName) + .setStatus(SaslStatus.SASL_START) + .setData(responseData) + .build(), + SaslMessage.class); + logger.trace("Initiated SASL exchange."); + } catch (final SaslException e) { + settableFuture.setException(e); + } + return future; + } + + private static byte[] evaluateChallenge(final UserGroupInformation ugi, final SaslClient saslClient, + final byte[] challenge) throws SaslException { + try { + return ugi.doAs(new PrivilegedExceptionAction<byte[]>() { + @Override + public byte[] run() throws Exception { + return saslClient.evaluateChallenge(challenge); + } + }); + } catch (final UndeclaredThrowableException e) { + final Throwable cause = e.getCause(); + if (cause instanceof SaslException) { + throw (SaslException) cause; + } else { + throw new SaslException( + String.format("Unexpected failure (%s)", saslClient.getMechanismName()), cause); + } + } catch (final IOException | InterruptedException e) { + throw new SaslException(String.format("Unexpected failure (%s)", saslClient.getMechanismName()), e); + } + } + + // handles SASL message exchange + private class SaslChallengeHandler implements RpcOutcomeListener<SaslMessage> { + + private final UserGroupInformation ugi; + private final SettableFuture<Void> future; + + public SaslChallengeHandler(UserGroupInformation ugi, SettableFuture<Void> future) { + this.ugi = ugi; + this.future = future; + } + + @Override + public void failed(RpcException ex) { + future.setException(new SaslException("Unexpected failure", ex)); + } + + @Override + public void success(SaslMessage value, ByteBuf buffer) { + logger.trace("Server responded with message of type: {}", value.getStatus()); + switch (value.getStatus()) { + case SASL_AUTH_IN_PROGRESS: { + try { + final SaslMessage.Builder response = SaslMessage.newBuilder(); + final byte[] responseBytes = evaluateChallenge(ugi, saslClient, value.getData().toByteArray()); + final boolean isComplete = saslClient.isComplete(); + logger.trace("Evaluated challenge. Completed? {}. Sending response to server.", isComplete); + response.setData(responseBytes != null ? ByteString.copyFrom(responseBytes) : ByteString.EMPTY); + // if isComplete, the client will get one more response from server + response.setStatus(isComplete ? SaslStatus.SASL_AUTH_SUCCESS : SaslStatus.SASL_AUTH_IN_PROGRESS); + send(new SaslChallengeHandler(ugi, future), + connection, + RpcType.SASL_MESSAGE, + response.build(), + SaslMessage.class, + true // the connection will not be backed up at this point + ); + } catch (Exception e) { + future.setException(e); + } + break; + } + case SASL_AUTH_SUCCESS: { + try { + if (saslClient.isComplete()) { + logger.trace("Successfully authenticated to server using {}", saslClient.getMechanismName()); + saslClient.dispose(); --- End diff -- Will update PR with the latest changes that include refactoring this `switch case` statement. > Kerberos Authentication > ----------------------- > > Key: DRILL-4280 > URL: https://issues.apache.org/jira/browse/DRILL-4280 > Project: Apache Drill > Issue Type: Improvement > Reporter: Keys Botzum > Assignee: Chunhui Shi > Labels: security > > Drill should support Kerberos based authentication from clients. This means > that both the ODBC and JDBC drivers as well as the web/REST interfaces should > support inbound Kerberos. For Web this would most likely be SPNEGO while for > ODBC and JDBC this will be more generic Kerberos. > Since Hive and much of Hadoop supports Kerberos there is a potential for a > lot of reuse of ideas if not implementation. > Note that this is related to but not the same as > https://issues.apache.org/jira/browse/DRILL-3584 -- This message was sent by Atlassian JIRA (v6.3.4#6332)