Github user sohami commented on a diff in the pull request:
https://github.com/apache/drill/pull/578#discussion_r99261692
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java ---
@@ -88,22 +129,183 @@ public void submitQuery(UserResultsListener
resultsListener, RunQuery query) {
send(queryResultHandler.getWrappedListener(resultsListener),
RpcType.RUN_QUERY, query, QueryId.class);
}
- public void connect(RpcConnectionHandler<ServerConnection> handler,
DrillbitEndpoint endpoint,
- UserProperties props, UserBitShared.UserCredentials
credentials) {
+ public CheckedFuture<Void, RpcException> connect(DrillbitEndpoint
endpoint, DrillProperties parameters,
+ UserCredentials
credentials) {
+ final FutureHandler handler = new FutureHandler();
UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder()
.setRpcVersion(UserRpcConfig.RPC_VERSION)
.setSupportListening(true)
.setSupportComplexTypes(supportComplexTypes)
.setSupportTimeout(true)
.setCredentials(credentials)
- .setClientInfos(UserRpcUtils.getRpcEndpointInfos(clientName));
+ .setClientInfos(UserRpcUtils.getRpcEndpointInfos(clientName))
+ .setSaslSupport(SaslSupport.SASL_AUTH)
+ .setProperties(parameters.serializeForServer());
+ this.properties = parameters;
+
+
connectAsClient(queryResultHandler.getWrappedConnectionHandler(handler),
+ hsBuilder.build(), endpoint.getAddress(), endpoint.getUserPort());
+ return handler;
+ }
+
+ /**
+ * Check (after {@link #connect connecting}) if server requires
authentication.
+ *
+ * @return true if server requires authentication
+ */
+ public boolean serverRequiresAuthentication() {
+ return supportedAuthMechs != null;
+ }
+
+ /**
+ * Returns a list of supported authentication mechanism. If called
before {@link #connect connecting},
+ * returns null. If called after {@link #connect connecting}, returns a
list of supported mechanisms
+ * iff authentication is required.
+ *
+ * @return list of supported authentication mechanisms
+ */
+ public List<String> getSupportedAuthenticationMechanisms() {
+ return supportedAuthMechs;
+ }
- if (props != null) {
- hsBuilder.setProperties(props);
+ /**
+ * Authenticate to the server asynchronously. Returns a future that
{@link CheckedFuture#checkedGet results}
+ * in null if authentication succeeds, or throws a {@link SaslException}
with relevant message if
+ * authentication fails.
+ *
+ * This method uses properties provided at {@link #connect connection
time} and override them with the
+ * given properties, if any.
+ *
+ * @param overrides parameter overrides
+ * @return result of authentication request
+ */
+ public CheckedFuture<Void, SaslException> authenticate(final
DrillProperties overrides) {
+ if (supportedAuthMechs == null) {
+ throw new IllegalStateException("Server does not require
authentication.");
}
+ properties.merge(overrides);
+ final Map<String, String> propertiesMap =
properties.stringPropertiesAsMap();
-
this.connectAsClient(queryResultHandler.getWrappedConnectionHandler(handler),
- hsBuilder.build(), endpoint.getAddress(), endpoint.getUserPort());
+ final SettableFuture<Void> settableFuture = SettableFuture.create();
// future used in SASL exchange
+ final CheckedFuture<Void, SaslException> future =
+ new AbstractCheckedFuture<Void, SaslException>(settableFuture) {
+
+ @Override
+ protected SaslException mapException(Exception e) {
+ if (connection != null) {
+ connection.close(); // to ensure connection is dropped
+ }
+ if (e instanceof ExecutionException) {
+ final Throwable cause = e.getCause();
+ if (cause instanceof SaslException) {
+ return new SaslException("Authentication failed: " +
cause.getMessage(), cause);
+ }
+ }
+ return new SaslException("Authentication failed
unexpectedly.", e);
+ }
+ };
+
+ final AuthenticatorFactory factory;
+ try {
+ factory = getAuthenticatorFactory();
+ } catch (final SaslException e) {
+ settableFuture.setException(e);
+ return future;
+ }
+
+ final String mechanismName = factory.getSimpleName();
+ logger.trace("Will try to login for {} mechanism.", mechanismName);
+ final UserGroupInformation ugi;
+ try {
+ ugi = factory.createAndLoginUser(propertiesMap);
+ } catch (final IOException e) {
+ settableFuture.setException(e);
+ return future;
+ }
+
+ logger.trace("Will try to authenticate to server using {} mechanism.",
mechanismName);
+ final SaslClient saslClient;
+ try {
+ saslClient = factory.createSaslClient(ugi, propertiesMap);
+ connection.setSaslClient(saslClient);
+ } catch (final SaslException e) {
+ settableFuture.setException(e);
+ return future;
+ }
+
+ if (saslClient == null) {
+ settableFuture.setException(new SaslException("Cannot initiate
authentication. Insufficient credentials?"));
+ return future;
+ }
+
+ logger.trace("Initiating SASL exchange.");
+ new AuthenticationOutcomeListener<>(this, connection,
RpcType.SASL_MESSAGE, ugi, new RpcOutcomeListener<Void>(){
+
+ @Override
+ public void failed(RpcException ex) {
+ settableFuture.setException(ex);
+ }
+
+ @Override
+ public void success(Void value, ByteBuf buffer) {
+ authComplete = true;
+ settableFuture.set(null);
+ }
+
+ @Override
+ public void interrupted(InterruptedException e) {
+ settableFuture.setException(e);
+ }
+ })
+ .initiate(mechanismName);
--- End diff --
please move this up.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---