http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/digest/ClientCallbackHandler.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/ClientCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/ClientCallbackHandler.java deleted file mode 100644 index 312e4ab..0000000 --- a/storm-client/src/jvm/org/apache/storm/security/auth/digest/ClientCallbackHandler.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.security.auth.digest; - -import org.apache.storm.security.auth.AbstractSaslClientCallbackHandler; -import org.apache.storm.security.auth.AuthUtils; - -import javax.security.auth.login.AppConfigurationEntry; -import javax.security.auth.login.Configuration; -import java.io.IOException; - -/** - * client side callback handler. - */ -public class ClientCallbackHandler extends AbstractSaslClientCallbackHandler { - - /** - * Constructor based on a JAAS configuration - * - * For digest, you should have a pair of user name and password defined. - * @throws IOException - */ - public ClientCallbackHandler(Configuration configuration) throws IOException { - if (configuration == null) return; - AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_CLIENT); - if (configurationEntries == null) { - String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_CLIENT - + "' entry in this configuration: Client cannot start."; - throw new IOException(errorMessage); - } - - _password = ""; - for(AppConfigurationEntry entry: configurationEntries) { - if (entry.getOptions().get(USERNAME) != null) { - _username = (String)entry.getOptions().get(USERNAME); - } - if (entry.getOptions().get(PASSWORD) != null) { - _password = (String)entry.getOptions().get(PASSWORD); - } - } - } - -}
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java index 4d123aa..1272712 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java @@ -15,12 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.security.auth.digest; import java.io.IOException; - +import java.util.Map; import javax.security.auth.callback.CallbackHandler; - +import javax.security.auth.login.AppConfigurationEntry; +import org.apache.storm.generated.WorkerToken; +import org.apache.storm.security.auth.AuthUtils; +import org.apache.storm.security.auth.sasl.SaslTransportPlugin; +import org.apache.storm.security.auth.sasl.SimpleSaslClientCallbackHandler; +import org.apache.storm.security.auth.sasl.SimpleSaslServerCallbackHandler; +import org.apache.storm.security.auth.workertoken.WorkerTokenAuthorizer; +import org.apache.storm.security.auth.workertoken.WorkerTokenClientCallbackHandler; import org.apache.thrift.transport.TSaslClientTransport; import org.apache.thrift.transport.TSaslServerTransport; import org.apache.thrift.transport.TTransport; @@ -29,20 +37,19 @@ import org.apache.thrift.transport.TTransportFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.storm.security.auth.AuthUtils; -import org.apache.storm.security.auth.SaslTransportPlugin; - public class DigestSaslTransportPlugin extends SaslTransportPlugin { public static final String DIGEST = "DIGEST-MD5"; private static final Logger LOG = LoggerFactory.getLogger(DigestSaslTransportPlugin.class); - protected TTransportFactory getServerTransportFactory() throws IOException { + protected TTransportFactory getServerTransportFactory() throws IOException { //create an authentication callback handler - CallbackHandler serer_callback_handler = new ServerCallbackHandler(login_conf); + CallbackHandler serverCallbackHandler = new SimpleSaslServerCallbackHandler( + new WorkerTokenAuthorizer(conf, type), + new JassPasswordProvider(loginConf)); //create a transport factory that will invoke our auth callback for digest TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory(); - factory.addServerDefinition(DIGEST, AuthUtils.SERVICE, "localhost", null, serer_callback_handler); + factory.addServerDefinition(DIGEST, AuthUtils.SERVICE, "localhost", null, serverCallbackHandler); LOG.info("SASL DIGEST-MD5 transport factory will be used"); return factory; @@ -50,19 +57,46 @@ public class DigestSaslTransportPlugin extends SaslTransportPlugin { @Override public TTransport connect(TTransport transport, String serverHost, String asUser) throws TTransportException, IOException { - ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf); - TSaslClientTransport wrapper_transport = new TSaslClientTransport(DIGEST, + CallbackHandler clientCallbackHandler; + WorkerToken token = WorkerTokenClientCallbackHandler.findWorkerTokenInSubject(type); + if (token != null) { + clientCallbackHandler = new WorkerTokenClientCallbackHandler(token); + } else if (loginConf != null) { + AppConfigurationEntry [] configurationEntries = loginConf.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_CLIENT); + if (configurationEntries == null) { + String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_CLIENT + + "' entry in this configuration: Client cannot start."; + throw new IOException(errorMessage); + } + + String username = ""; + String password = ""; + for (AppConfigurationEntry entry : configurationEntries) { + Map options = entry.getOptions(); + username = (String)options.getOrDefault("username", username); + password = (String)options.getOrDefault("password", password); + } + clientCallbackHandler = new SimpleSaslClientCallbackHandler(username, password); + } else { + throw new IOException("Could not find any way to authenticate with the server."); + } + + TSaslClientTransport wrapperTransport = new TSaslClientTransport(DIGEST, null, AuthUtils.SERVICE, serverHost, null, - client_callback_handler, + clientCallbackHandler, transport); - wrapper_transport.open(); + wrapperTransport.open(); LOG.debug("SASL DIGEST-MD5 client transport has been established"); - return wrapper_transport; + return wrapperTransport; } + @Override + public boolean areWorkerTokensSupported() { + return true; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java new file mode 100644 index 0000000..bb3f1bf --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.security.auth.digest; + +import java.io.IOException; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import org.apache.storm.security.auth.AuthUtils; +import org.apache.storm.security.auth.ThriftConnectionType; +import org.apache.storm.security.auth.sasl.PasswordProvider; + +/** + * Provides passwords out of a jaas conf for typical MD5-DIGEST authentication support. + */ +public class JassPasswordProvider implements PasswordProvider { + private static final String USER_PREFIX = "user_"; + /** + * The system property that sets a super user password. This can be used in addition to the + * jaas conf, and takes precedent over a "super" user in the jaas conf if this is set. + */ + public static final String SYSPROP_SUPER_PASSWORD = "storm.SASLAuthenticationProvider.superPassword"; + + private Map<String, char[]> credentials = new ConcurrentHashMap<>(); + + /** + * Constructor. + * @param configuration the jaas configuration to get the credentials out of. + * @throws IOException if we could not read the Server section in the jaas conf. + */ + public JassPasswordProvider(Configuration configuration) throws IOException { + if (configuration == null) { + return; + } + + AppConfigurationEntry[] configurationEntries = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER); + if (configurationEntries == null) { + String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_SERVER + + "' entry in this configuration: Server cannot start."; + throw new IOException(errorMessage); + } + credentials.clear(); + for (AppConfigurationEntry entry : configurationEntries) { + Map<String, ?> options = entry.getOptions(); + // Populate user -> password map with JAAS configuration entries from the "Server" section. + // Usernames are distinguished from other options by prefixing the username with a "user_" prefix. + for (Map.Entry<String, ?> pair : options.entrySet()) { + String key = pair.getKey(); + if (key.startsWith(USER_PREFIX)) { + String userName = key.substring(USER_PREFIX.length()); + credentials.put(userName, ((String) pair.getValue()).toCharArray()); + } + } + } + + String superPassword = System.getProperty(SYSPROP_SUPER_PASSWORD); + if (superPassword != null) { + credentials.put("super", superPassword.toCharArray()); + } + } + + @Override + public Optional<char[]> getPasswordFor(String user) { + return Optional.ofNullable(credentials.get(user)); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/digest/ServerCallbackHandler.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/ServerCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/ServerCallbackHandler.java deleted file mode 100644 index 7c4414f..0000000 --- a/storm-client/src/jvm/org/apache/storm/security/auth/digest/ServerCallbackHandler.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.security.auth.digest; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.storm.security.auth.AbstractSaslServerCallbackHandler; -import org.apache.storm.security.auth.ReqContext; -import org.apache.storm.security.auth.SaslTransportPlugin; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.auth.login.AppConfigurationEntry; -import javax.security.auth.login.Configuration; -import javax.security.sasl.AuthorizeCallback; -import javax.security.sasl.RealmCallback; - -import org.apache.storm.security.auth.AuthUtils; - -/** - * SASL server side callback handler - */ -public class ServerCallbackHandler extends AbstractSaslServerCallbackHandler { - private static final Logger LOG = LoggerFactory.getLogger(ServerCallbackHandler.class); - private static final String USER_PREFIX = "user_"; - public static final String SYSPROP_SUPER_PASSWORD = "storm.SASLAuthenticationProvider.superPassword"; - - public ServerCallbackHandler(Configuration configuration) throws IOException { - if (configuration==null) return; - - AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER); - if (configurationEntries == null) { - String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_SERVER+"' entry in this configuration: Server cannot start."; - throw new IOException(errorMessage); - } - credentials.clear(); - for(AppConfigurationEntry entry: configurationEntries) { - Map<String,?> options = entry.getOptions(); - // Populate DIGEST-MD5 user -> password map with JAAS configuration entries from the "Server" section. - // Usernames are distinguished from other options by prefixing the username with a "user_" prefix. - for(Map.Entry<String, ?> pair : options.entrySet()) { - String key = pair.getKey(); - if (key.startsWith(USER_PREFIX)) { - String userName = key.substring(USER_PREFIX.length()); - credentials.put(userName,(String)pair.getValue()); - } - } - } - } - - @Override - protected void handlePasswordCallback(PasswordCallback pc) { - LOG.debug("handlePasswordCallback"); - if ("super".equals(this.userName) && System.getProperty(SYSPROP_SUPER_PASSWORD) != null) { - // superuser: use Java system property for password, if available. - pc.setPassword(System.getProperty(SYSPROP_SUPER_PASSWORD).toCharArray()); - } else { - super.handlePasswordCallback(pc); - } - - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java index b6571ba..157ae54 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java @@ -35,8 +35,12 @@ import javax.security.auth.login.Configuration; import javax.security.auth.login.LoginException; import javax.security.sasl.Sasl; +import org.apache.storm.generated.WorkerToken; import org.apache.storm.messaging.netty.Login; import org.apache.commons.lang.StringUtils; +import org.apache.storm.security.auth.sasl.SimpleSaslServerCallbackHandler; +import org.apache.storm.security.auth.workertoken.WorkerTokenAuthorizer; +import org.apache.storm.security.auth.workertoken.WorkerTokenClientCallbackHandler; import org.apache.thrift.transport.TSaslClientTransport; import org.apache.thrift.transport.TSaslServerTransport; import org.apache.thrift.transport.TTransport; @@ -47,10 +51,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.storm.security.auth.AuthUtils; -import org.apache.storm.security.auth.SaslTransportPlugin; +import org.apache.storm.security.auth.sasl.SaslTransportPlugin; public class KerberosSaslTransportPlugin extends SaslTransportPlugin { - public static final String KERBEROS = "GSSAPI"; + public static final String KERBEROS = "GSSAPI"; + private static final String DIGEST = "DIGEST-MD5"; private static final Logger LOG = LoggerFactory.getLogger(KerberosSaslTransportPlugin.class); private static Map <LoginCacheKey, Login> loginCache = new ConcurrentHashMap<>(); private static final String DISABLE_LOGIN_CACHE = "disableLoginCache"; @@ -93,15 +98,16 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin { } } + @Override public TTransportFactory getServerTransportFactory() throws IOException { //create an authentication callback handler - CallbackHandler server_callback_handler = new ServerCallbackHandler(login_conf, topoConf); + CallbackHandler server_callback_handler = new ServerCallbackHandler(loginConf); //login our principal Subject subject = null; try { //specify a configuration object to be used - Configuration.setConfiguration(login_conf); + Configuration.setConfiguration(loginConf); //now login Login login = new Login(AuthUtils.LOGIN_CONTEXT_SERVER, server_callback_handler); subject = login.getSubject(); @@ -114,15 +120,15 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin { //check the credential of our principal if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { throw new RuntimeException("Fail to verify user principal with section \"" - +AuthUtils.LOGIN_CONTEXT_SERVER+"\" in login configuration file "+ login_conf); + + AuthUtils.LOGIN_CONTEXT_SERVER + "\" in login configuration file " + loginConf); } - String principal = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_SERVER, "principal"); + String principal = AuthUtils.get(loginConf, AuthUtils.LOGIN_CONTEXT_SERVER, "principal"); LOG.debug("principal:"+principal); KerberosName serviceKerberosName = new KerberosName(principal); String serviceName = serviceKerberosName.getServiceName(); String hostName = serviceKerberosName.getHostName(); - Map<String, String> props = new TreeMap<String,String>(); + Map<String, String> props = new TreeMap<>(); props.put(Sasl.QOP, "auth"); props.put(Sasl.SERVER_AUTH, "false"); @@ -130,6 +136,10 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin { TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory(); factory.addServerDefinition(KERBEROS, serviceName, hostName, props, server_callback_handler); + //Also add in support for worker tokens + factory.addServerDefinition(DIGEST, AuthUtils.SERVICE, "localhost", null, + new SimpleSaslServerCallbackHandler(new WorkerTokenAuthorizer(conf, type))); + //create a wrap transport factory so that we could apply user credential during connections TUGIAssumingTransportFactory wrapFactory = new TUGIAssumingTransportFactory(factory, subject); @@ -140,9 +150,9 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin { private Login mkLogin() throws IOException { try { //create an authentication callback handler - ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf); + ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(loginConf); //specify a configuration object to be used - Configuration.setConfiguration(login_conf); + Configuration.setConfiguration(loginConf); //now login Login login = new Login(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler); login.startThreadIfNeeded(); @@ -154,9 +164,28 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin { } @Override - public TTransport connect(TTransport transport, String serverHost, String asUser) throws TTransportException, IOException { + public TTransport connect(TTransport transport, String serverHost, String asUser) throws IOException, TTransportException { + WorkerToken token = WorkerTokenClientCallbackHandler.findWorkerTokenInSubject(type); + if (token != null && asUser != null) { + CallbackHandler clientCallbackHandler = new WorkerTokenClientCallbackHandler(token); + TSaslClientTransport wrapperTransport = new TSaslClientTransport(DIGEST, + null, + AuthUtils.SERVICE, + serverHost, + null, + clientCallbackHandler, + transport); + wrapperTransport.open(); + LOG.debug("SASL DIGEST-MD5 WorkerToken client transport has been established"); + + return wrapperTransport; + } + return kerberosConnect(transport, serverHost, asUser); + } + + private TTransport kerberosConnect(TTransport transport, String serverHost, String asUser) throws IOException { //login our user - SortedMap<String, ?> authConf = AuthUtils.pullConfig(login_conf, AuthUtils.LOGIN_CONTEXT_CLIENT); + SortedMap<String, ?> authConf = AuthUtils.pullConfig(loginConf, AuthUtils.LOGIN_CONTEXT_CLIENT); if (authConf == null) { throw new RuntimeException("Error in parsing the kerberos login Configuration, returned null"); } @@ -194,15 +223,15 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin { final Subject subject = login.getSubject(); if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //error throw new RuntimeException("Fail to verify user principal with section \"" - +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ login_conf); + +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ loginConf); } final String principal = StringUtils.isBlank(asUser) ? getPrincipal(subject) : asUser; - String serviceName = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName"); + String serviceName = AuthUtils.get(loginConf, AuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName"); if (serviceName == null) { serviceName = AuthUtils.SERVICE; } - Map<String, String> props = new TreeMap<String,String>(); + Map<String, String> props = new TreeMap<>(); props.put(Sasl.QOP, "auth"); props.put(Sasl.SERVER_AUTH, "false"); @@ -246,7 +275,8 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin { return ((Principal)(principals.toArray()[0])).getName(); } - /** A TransportFactory that wraps another one, but assumes a specified UGI + /** + * A TransportFactory that wraps another one, but assumes a specified UGI * before calling through. * * This is used on the server side to assume the server's Principal when accepting @@ -269,22 +299,25 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin { public TTransport getTransport(final TTransport trans) { try { return Subject.doAs(subject, - new PrivilegedExceptionAction<TTransport>() { - public TTransport run() { + (PrivilegedExceptionAction<TTransport>) () -> { try { return wrapped.getTransport(trans); } catch (Exception e) { LOG.debug("Storm server failed to open transport " + - "to interact with a client during session initiation: " + e, e); + "to interact with a client during session initiation: " + e, e); return new NoOpTTrasport(null); } - } - }); + }); } catch (PrivilegedActionException e) { LOG.error("Storm server experienced a PrivilegedActionException exception while creating a transport using a JAAS principal context:" + e, e); return null; } } } + + @Override + public boolean areWorkerTokensSupported() { + return true; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java index 59eb80d..d3157c8 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java @@ -18,79 +18,87 @@ package org.apache.storm.security.auth.kerberos; +import javax.security.sasl.RealmCallback; import org.apache.storm.security.auth.AuthUtils; import org.apache.storm.security.auth.ReqContext; -import org.apache.storm.security.auth.SaslTransportPlugin; +import org.apache.storm.security.auth.sasl.SaslTransportPlugin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.auth.Subject; import javax.security.auth.callback.*; import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.Configuration; import javax.security.sasl.AuthorizeCallback; import java.io.IOException; -import java.util.Map; /** - * SASL server side callback handler + * SASL server side callback handler for kerberos auth. */ public class ServerCallbackHandler implements CallbackHandler { private static final Logger LOG = LoggerFactory.getLogger(ServerCallbackHandler.class); - private String userName; - - public ServerCallbackHandler(Configuration configuration, Map<String, Object> topoConf) throws IOException { - if (configuration==null) return; + public ServerCallbackHandler(Configuration configuration) throws IOException { + if (configuration == null) { + return; + } AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER); if (configurationEntries == null) { - String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_SERVER+"' entry in this configuration: Server cannot start."; + String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_SERVER + + "' entry in this configuration: Server cannot start."; LOG.error(errorMessage); throw new IOException(errorMessage); } - } public void handle(Callback[] callbacks) throws UnsupportedCallbackException { + NameCallback nc = null; + PasswordCallback pc = null; + AuthorizeCallback ac = null; for (Callback callback : callbacks) { - if (callback instanceof NameCallback) { - handleNameCallback((NameCallback) callback); + if (callback instanceof AuthorizeCallback) { + ac = (AuthorizeCallback) callback; + } else if (callback instanceof NameCallback) { + nc = (NameCallback) callback; } else if (callback instanceof PasswordCallback) { - handlePasswordCallback((PasswordCallback) callback); - } else if (callback instanceof AuthorizeCallback) { - handleAuthorizeCallback((AuthorizeCallback) callback); + pc = (PasswordCallback) callback; + } else if (callback instanceof RealmCallback) { + //Ignored... + } else { + throw new UnsupportedCallbackException(callback, + "Unrecognized SASL Callback"); } } - } - private void handleNameCallback(NameCallback nc) { - LOG.debug("handleNameCallback"); - userName = nc.getDefaultName(); - nc.setName(nc.getDefaultName()); - } + String userName = "UNKNOWN"; + if (nc != null) { + LOG.debug("handleNameCallback"); + userName = nc.getDefaultName(); + nc.setName(nc.getDefaultName()); + } - private void handlePasswordCallback(PasswordCallback pc) { - LOG.warn("No password found for user: " + userName); - } + if (pc != null) { + LOG.warn("No password found for user: {}", userName); + } - private void handleAuthorizeCallback(AuthorizeCallback ac) { - String authenticationID = ac.getAuthenticationID(); - LOG.info("Successfully authenticated client: authenticationID=" + authenticationID + " authorizationID= " + ac.getAuthorizationID()); + if (ac != null) { + String authenticationID = ac.getAuthenticationID(); + LOG.info("Successfully authenticated client: authenticationID=" + authenticationID + " authorizationID= " + ac.getAuthorizationID()); - //if authorizationId is not set, set it to authenticationId. - if(ac.getAuthorizationID() == null) { - ac.setAuthorizedID(authenticationID); - } + //if authorizationId is not set, set it to authenticationId. + if (ac.getAuthorizationID() == null) { + ac.setAuthorizedID(authenticationID); + } - //When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We - //add the authNid as the real user in reqContext's subject which will be used during authorization. - if(!ac.getAuthenticationID().equals(ac.getAuthorizationID())) { - ReqContext.context().setRealPrincipal(new SaslTransportPlugin.User(ac.getAuthenticationID())); - } else { - ReqContext.context().setRealPrincipal(null); - } + //When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We + //add the authNid as the real user in reqContext's subject which will be used during authorization. + if (!ac.getAuthenticationID().equals(ac.getAuthorizationID())) { + ReqContext.context().setRealPrincipal(new SaslTransportPlugin.User(ac.getAuthenticationID())); + } else { + ReqContext.context().setRealPrincipal(null); + } - ac.setAuthorized(true); + ac.setAuthorized(true); + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainClientCallbackHandler.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainClientCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainClientCallbackHandler.java index 13340df..b01cdc4 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainClientCallbackHandler.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainClientCallbackHandler.java @@ -15,17 +15,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.security.auth.plain; -import org.apache.storm.security.auth.AbstractSaslClientCallbackHandler; +import org.apache.storm.security.auth.sasl.SimpleSaslClientCallbackHandler; -public class PlainClientCallbackHandler extends AbstractSaslClientCallbackHandler { +/** + * This should only ever be used for testing. It provides no security at all. + * DO NOT USE THIS. The user name is the current user and the password is + * "password". + */ +@Deprecated +public class PlainClientCallbackHandler extends SimpleSaslClientCallbackHandler { - /* + /** * For plain, using constants for a pair of user name and password. */ public PlainClientCallbackHandler() { - _username = System.getProperty("user.name"); - _password = PASSWORD; + super(System.getProperty("user.name"), "password"); } } http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java index eaef91a..2df61c1 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java @@ -15,10 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.security.auth.plain; +import java.util.Optional; import org.apache.storm.security.auth.AuthUtils; -import org.apache.storm.security.auth.SaslTransportPlugin; +import org.apache.storm.security.auth.sasl.SaslTransportPlugin; +import org.apache.storm.security.auth.sasl.SimpleSaslServerCallbackHandler; import org.apache.thrift.transport.TSaslClientTransport; import org.apache.thrift.transport.TSaslServerTransport; import org.apache.thrift.transport.TTransport; @@ -31,6 +34,11 @@ import javax.security.auth.callback.CallbackHandler; import java.io.IOException; import java.security.Security; +/** + * This should never be used except for testing. It provides no security at all. + * The password is hard coded, and even if it were not it is sent in plain text. + */ +@Deprecated public class PlainSaslTransportPlugin extends SaslTransportPlugin { public static final String PLAIN = "PLAIN"; private static final Logger LOG = LoggerFactory.getLogger(PlainSaslTransportPlugin.class); @@ -38,7 +46,7 @@ public class PlainSaslTransportPlugin extends SaslTransportPlugin { @Override protected TTransportFactory getServerTransportFactory() throws IOException { //create an authentication callback handler - CallbackHandler serverCallbackHandler = new PlainServerCallbackHandler(); + CallbackHandler serverCallbackHandler = new SimpleSaslServerCallbackHandler((userName) -> Optional.of("password".toCharArray())); if (Security.getProvider(SaslPlainServer.SecurityProvider.SASL_PLAIN_SERVER) == null) { Security.addProvider(new SaslPlainServer.SecurityProvider()); } @@ -46,7 +54,7 @@ public class PlainSaslTransportPlugin extends SaslTransportPlugin { TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory(); factory.addServerDefinition(PLAIN, AuthUtils.SERVICE, "localhost", null, serverCallbackHandler); - LOG.info("SASL PLAIN transport factory will be used"); + LOG.error("SASL PLAIN transport factory will be used. This is totally insecure. Please do not use this."); return factory; } @@ -62,10 +70,8 @@ public class PlainSaslTransportPlugin extends SaslTransportPlugin { transport); wrapperTransport.open(); - LOG.debug("SASL PLAIN client transport has been established"); + LOG.error("SASL PLAIN client transport has been established. This is totally insecure. Please do not use this."); return wrapperTransport; - } - } http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainServerCallbackHandler.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainServerCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainServerCallbackHandler.java deleted file mode 100644 index c646fc9..0000000 --- a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainServerCallbackHandler.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.security.auth.plain; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.storm.security.auth.AbstractSaslServerCallbackHandler; -import org.apache.storm.security.auth.ReqContext; -import org.apache.storm.security.auth.SaslTransportPlugin; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.sasl.AuthorizeCallback; -import javax.security.sasl.RealmCallback; - -/** - * SASL server side callback handler - */ -public class PlainServerCallbackHandler extends AbstractSaslServerCallbackHandler { - private static final Logger LOG = LoggerFactory.getLogger(PlainServerCallbackHandler.class); - public static final String PASSWORD = "password"; - - public PlainServerCallbackHandler() throws IOException { - userName=null; - } - - protected void handlePasswordCallback(PasswordCallback pc) { - LOG.debug("handlePasswordCallback"); - pc.setPassword(PASSWORD.toCharArray()); - - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/plain/SaslPlainServer.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/plain/SaslPlainServer.java b/storm-client/src/jvm/org/apache/storm/security/auth/plain/SaslPlainServer.java index c84ce77..62ee872 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/plain/SaslPlainServer.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/plain/SaslPlainServer.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.security.auth.plain; import javax.security.auth.callback.Callback; @@ -29,6 +30,7 @@ import javax.security.sasl.SaslServerFactory; import java.security.Provider; import java.util.Map; +@Deprecated public class SaslPlainServer implements SaslServer { @SuppressWarnings("serial") public static class SecurityProvider extends Provider { http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/sasl/PasswordProvider.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/PasswordProvider.java b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/PasswordProvider.java new file mode 100644 index 0000000..d1e0c0f --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/PasswordProvider.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.security.auth.sasl; + +import java.util.Optional; + +/** + * A very basic API that will provide a password for a given user name. + * This is intended to be used with the SimpleSaslServerCallbackHandler + * to verify a user that is attempting to log in. + */ +public interface PasswordProvider { + /** + * Get an optional password for a user. If no password for the user is found + * the option will be empty and another PasswordProvider would be tried. + * @param user the user this is for. + * @return the password if it is found. + */ + Optional<char[]> getPasswordFor(String user); + + /** + * Convert the supplied user name to the actual user name that should be used + * in the system. This may be called on any name. If it cannot be translated + * then a null may be returned or an exception thrown. If getPassword returns successfully + * this should not return null, nor throw an exception for the same user. + * @param user the SASL negotiated user name. + * @return the user name that storm should use. + */ + default String userName(String user) { + return user; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java new file mode 100644 index 0000000..71cdbf5 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.security.auth.sasl; + +import java.io.IOException; +import java.net.Socket; +import java.security.Principal; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.security.auth.Subject; +import javax.security.auth.login.Configuration; +import javax.security.sasl.SaslServer; +import org.apache.storm.security.auth.ITransportPlugin; +import org.apache.storm.security.auth.ReqContext; +import org.apache.storm.security.auth.ThriftConnectionType; +import org.apache.storm.security.auth.kerberos.NoOpTTrasport; +import org.apache.storm.utils.ExtendedThreadPoolExecutor; +import org.apache.thrift.TException; +import org.apache.thrift.TProcessor; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.transport.TSaslServerTransport; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.TTransportFactory; + +/** + * Base class for SASL authentication plugin. + */ +public abstract class SaslTransportPlugin implements ITransportPlugin { + protected ThriftConnectionType type; + protected Map<String, Object> conf; + protected Configuration loginConf; + private int port; + + @Override + public void prepare(ThriftConnectionType type, Map<String, Object> conf, Configuration loginConf) { + this.type = type; + this.conf = conf; + this.loginConf = loginConf; + } + + @Override + public TServer getServer(TProcessor processor) throws IOException, TTransportException { + int configuredPort = type.getPort(conf); + Integer socketTimeout = type.getSocketTimeOut(conf); + TTransportFactory serverTransportFactory = getServerTransportFactory(); + TServerSocket serverTransport = null; + if (socketTimeout != null) { + serverTransport = new TServerSocket(configuredPort, socketTimeout); + } else { + serverTransport = new TServerSocket(configuredPort); + } + this.port = serverTransport.getServerSocket().getLocalPort(); + int numWorkerThreads = type.getNumThreads(conf); + Integer queueSize = type.getQueueSize(conf); + + TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport) + .processor(new TUGIWrapProcessor(processor)) + .minWorkerThreads(numWorkerThreads) + .maxWorkerThreads(numWorkerThreads) + .protocolFactory(new TBinaryProtocol.Factory(false, true)); + + if (serverTransportFactory != null) { + serverArgs.transportFactory(serverTransportFactory); + } + BlockingQueue workQueue = new SynchronousQueue(); + if (queueSize != null) { + workQueue = new ArrayBlockingQueue(queueSize); + } + ThreadPoolExecutor executorService = new ExtendedThreadPoolExecutor(numWorkerThreads, numWorkerThreads, + 60, TimeUnit.SECONDS, workQueue); + serverArgs.executorService(executorService); + return new TThreadPoolServer(serverArgs); + } + + /** + * Create the transport factory needed for serving. All subclass must implement this method. + * @return server transport factory + * @throws IOException on any error. + */ + protected abstract TTransportFactory getServerTransportFactory() throws IOException; + + @Override + public int getPort() { + return this.port; + } + + + /** + * Processor that pulls the SaslServer object out of the transport, and + * assumes the remote user's UGI before calling through to the original + * processor. This is used on the server side to set the UGI for each specific call. + */ + private static class TUGIWrapProcessor implements TProcessor { + final TProcessor wrapped; + + TUGIWrapProcessor(TProcessor wrapped) { + this.wrapped = wrapped; + } + + public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException { + //populating request context + ReqContext reqContext = ReqContext.context(); + + TTransport trans = inProt.getTransport(); + //Sasl transport + TSaslServerTransport saslTrans = (TSaslServerTransport)trans; + + if (trans instanceof NoOpTTrasport) { + return false; + } + + //remote address + TSocket tsocket = (TSocket)saslTrans.getUnderlyingTransport(); + Socket socket = tsocket.getSocket(); + reqContext.setRemoteAddress(socket.getInetAddress()); + + //remote subject + SaslServer saslServer = saslTrans.getSaslServer(); + String authId = saslServer.getAuthorizationID(); + Subject remoteUser = new Subject(); + remoteUser.getPrincipals().add(new User(authId)); + reqContext.setSubject(remoteUser); + + //invoke service handler + return wrapped.process(inProt, outProt); + } + } + + public static class User implements Principal { + private final String name; + + public User(String name) { + this.name = name; + } + + /** + * Get the full name of the user. + */ + public String getName() { + return name; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + return !(o == null || getClass() != o.getClass()) && (name.equals(((User) o).name)); + } + + @Override + public int hashCode() { + return name.hashCode(); + } + + @Override + public String toString() { + return name; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslClientCallbackHandler.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslClientCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslClientCallbackHandler.java new file mode 100644 index 0000000..2242f0c --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslClientCallbackHandler.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.security.auth.sasl; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.AuthorizeCallback; +import javax.security.sasl.RealmCallback; + +/** + * A client callback handler that supports a single username and password. + */ +public class SimpleSaslClientCallbackHandler implements CallbackHandler { + private final String username; + private final String password; + + /** + * Constructor. + * @param username the username to use. + * @param password the password to use. + */ + public SimpleSaslClientCallbackHandler(String username, String password) { + this.username = username; + this.password = password; + } + + @Override + public void handle(Callback[] callbacks) throws UnsupportedCallbackException { + for (Callback c : callbacks) { + if (c instanceof NameCallback) { + NameCallback nc = (NameCallback) c; + nc.setName(username); + } else if (c instanceof PasswordCallback) { + PasswordCallback pc = (PasswordCallback)c; + if (password != null) { + pc.setPassword(password.toCharArray()); + } + } else if (c instanceof AuthorizeCallback) { + AuthorizeCallback ac = (AuthorizeCallback) c; + String authid = ac.getAuthenticationID(); + String authzid = ac.getAuthorizationID(); + if (authid.equals(authzid)) { + ac.setAuthorized(true); + } else { + ac.setAuthorized(false); + } + if (ac.isAuthorized()) { + ac.setAuthorizedID(authzid); + } + } else if (c instanceof RealmCallback) { + RealmCallback rc = (RealmCallback) c; + ((RealmCallback) c).setText(rc.getDefaultText()); + } else { + throw new UnsupportedCallbackException(c); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java new file mode 100644 index 0000000..d64fa1b --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.security.auth.sasl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.AuthorizeCallback; +import javax.security.sasl.RealmCallback; +import org.apache.storm.security.auth.ReqContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SimpleSaslServerCallbackHandler implements CallbackHandler { + private static final Logger LOG = LoggerFactory.getLogger(SimpleSaslServerCallbackHandler.class); + private final List<PasswordProvider> providers; + + /** + * Constructor with different password providers. + * @param providers what will provide a password. They will be checked in order, and the first one to + * return a password wins. + */ + public SimpleSaslServerCallbackHandler(PasswordProvider ... providers) { + this(Arrays.asList(providers)); + } + + /** + * Constructor with different password providers. + * @param providers what will provide a password. They will be checked in order, and the first one to + * return a password wins. + */ + public SimpleSaslServerCallbackHandler(List<PasswordProvider> providers) { + this.providers = new ArrayList<>(providers); + } + + private static void log(String type, AuthorizeCallback ac, NameCallback nc, PasswordCallback pc, RealmCallback rc) { + if (LOG.isDebugEnabled()) { + String acs = "null"; + if (ac != null) { + acs = "athz: " + ac.getAuthorizationID() + " athn: " + ac.getAuthenticationID() + " authorized: " + ac.getAuthorizedID(); + } + + String ncs = "null"; + if (nc != null) { + ncs = "default: " + nc.getDefaultName() + " name: " + nc.getName(); + } + + String pcs = "null"; + if (pc != null) { + char[] pwd = pc.getPassword(); + pcs = "password: " + (pwd == null ? "null" : "not null " + pwd.length); + } + + String rcs = "null"; + if (rc != null) { + rcs = "default: " + rc.getDefaultText() + " text: " + rc.getText(); + } + LOG.debug("{}\nAC: {}\nNC: {}\nPC: {}\nRC: {}", type, acs, ncs, pcs, rcs); + } + } + + private String translateName(String orig) { + for (PasswordProvider provider: providers) { + try { + String ret = provider.userName(orig); + if (ret != null) { + return ret; + } + } catch (Exception e) { + //Translating the name (this call) happens in a different callback from validating + // the user name and password. This has to be stateless though, so we cannot save + // the password provider away to be sure we got the same one that validated the password. + // If the password providers are written correctly this should never happen, + // because if they cannot read the name they would return a null. + // But on the off chance that something goes wrong with the translation because of a mismatch + // we try to skip the bad one. + LOG.debug("{} could not read name from {}", provider, orig, e); + } + } + // In the worst case we will return a serialized name after a password provider said that the password + // was okay. In that case the ACLs are likely to prevent the request from going through anyways. + // But that is only if there is a bug in one of the password providers. + return orig; + } + + @Override + public void handle(Callback[] callbacks) throws UnsupportedCallbackException, IOException { + NameCallback nc = null; + PasswordCallback pc = null; + AuthorizeCallback ac = null; + RealmCallback rc = null; + for (Callback callback : callbacks) { + if (callback instanceof AuthorizeCallback) { + ac = (AuthorizeCallback) callback; + } else if (callback instanceof NameCallback) { + nc = (NameCallback) callback; + } else if (callback instanceof PasswordCallback) { + pc = (PasswordCallback) callback; + } else if (callback instanceof RealmCallback) { + rc = (RealmCallback) callback; + } else { + throw new UnsupportedCallbackException(callback, + "Unrecognized SASL Callback"); + } + } + + log("GOT", ac, nc, pc, rc); + + if (nc != null) { + String userName = nc.getDefaultName(); + boolean passwordFound = false; + for (PasswordProvider provider : providers) { + Optional<char[]> password = provider.getPasswordFor(userName); + if (password.isPresent()) { + pc.setPassword(password.get()); + nc.setName(provider.userName(userName)); + passwordFound = true; + break; + } + } + if (!passwordFound) { + LOG.warn("No password found for user: {}", userName); + throw new IOException("NOT ALLOWED."); + } + } + + if (rc != null) { + rc.setText(rc.getDefaultText()); + } + + if (ac != null) { + String nid = ac.getAuthenticationID(); + if (nid != null) { + nid = translateName(nid); + } + + String zid = ac.getAuthorizationID(); + if (zid != null) { + zid = translateName(zid); + } + LOG.info("Successfully authenticated client: authenticationID = {} authorizationID = {}", + nid, zid); + + //if authorizationId is not set, set it to authenticationId. + if (zid == null) { + ac.setAuthorizedID(nid); + zid = nid; + } else { + ac.setAuthorizedID(zid); + } + + //When zid and zid are not equal, nid is attempting to impersonate zid, We + //add the nid as the real user in reqContext's subject which will be used during authorization. + if (!nid.equals(zid)) { + LOG.info("Impersonation attempt authenticationID = {} authorizationID = {}", + nid, zid); + ReqContext.context().setRealPrincipal(new SaslTransportPlugin.User(nid)); + } else { + ReqContext.context().setRealPrincipal(null); + } + + ac.setAuthorized(true); + } + log("FINISHED", ac, nc, pc, rc); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java new file mode 100644 index 0000000..b196ade --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.security.auth.workertoken; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import java.util.Base64; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import javax.crypto.SecretKey; +import javax.crypto.spec.SecretKeySpec; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.generated.PrivateWorkerKey; +import org.apache.storm.generated.WorkerTokenInfo; +import org.apache.storm.generated.WorkerTokenServiceType; +import org.apache.storm.security.auth.AuthUtils; +import org.apache.storm.security.auth.ThriftConnectionType; +import org.apache.storm.security.auth.sasl.PasswordProvider; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Allow for SASL authentication using worker tokens. + */ +public class WorkerTokenAuthorizer implements PasswordProvider { + private static final Logger LOG = LoggerFactory.getLogger(WorkerTokenAuthorizer.class); + + private static IStormClusterState buildStateIfNeeded(Map<String, Object> conf, ThriftConnectionType connectionType) { + IStormClusterState state = null; + + if (AuthUtils.areWorkerTokensEnabledServer(connectionType, conf)) { + try { + state = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.UNKNOWN, conf)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return state; + } + + private final LoadingCache<WorkerTokenInfo, PrivateWorkerKey> keyCache; + + /** + * Constructor. + * @param conf the daemon config for the server. + * @param connectionType the type of connection we are authorizing. + */ + public WorkerTokenAuthorizer(Map<String, Object> conf, ThriftConnectionType connectionType) { + this(connectionType.getWtType(), buildStateIfNeeded(conf, connectionType)); + } + + @VisibleForTesting + WorkerTokenAuthorizer(final WorkerTokenServiceType serviceType, final IStormClusterState state) { + LoadingCache<WorkerTokenInfo, PrivateWorkerKey> tmpKeyCache = null; + if (state != null) { + tmpKeyCache = + CacheBuilder.newBuilder() + .maximumSize(2_000) + .expireAfterWrite(2, TimeUnit.HOURS) + .build(new CacheLoader<WorkerTokenInfo, PrivateWorkerKey>() { + + @Override + public PrivateWorkerKey load(WorkerTokenInfo wtInfo) { + return state.getPrivateWorkerKey(serviceType, + wtInfo.get_topologyId(), + wtInfo.get_secretVersion()); + } + }); + } + keyCache = tmpKeyCache; + } + + @VisibleForTesting + byte[] getSignedPasswordFor(byte[] user, WorkerTokenInfo deser) { + assert keyCache != null; + + if (deser.is_set_expirationTimeMillis() && deser.get_expirationTimeMillis() <= Time.currentTimeMillis()) { + throw new IllegalArgumentException("Token is not valid, token has expired."); + } + + PrivateWorkerKey key = keyCache.getUnchecked(deser); + if (key == null) { + throw new IllegalArgumentException("Token is not valid, private key not found."); + } + + if (key.is_set_expirationTimeMillis() && key.get_expirationTimeMillis() <= Time.currentTimeMillis()) { + throw new IllegalArgumentException("Token is not valid, key has expired."); + } + + return WorkerTokenSigner.createPassword(user, new SecretKeySpec(key.get_key(), WorkerTokenSigner.DEFAULT_HMAC_ALGORITHM)); + } + + @Override + public Optional<char[]> getPasswordFor(String userName) { + if (keyCache == null) { + return Optional.empty(); + } + try { + byte[] user = Base64.getDecoder().decode(userName); + WorkerTokenInfo deser = Utils.deserialize(user, WorkerTokenInfo.class); + byte[] password = getSignedPasswordFor(user, deser); + return Optional.of(Base64.getEncoder().encodeToString(password).toCharArray()); + } catch (Exception e) { + LOG.debug("Could not decode {}, might just be a plain digest request...", userName, e); + return Optional.empty(); + } + } + + @Override + public String userName(String userName) { + byte[] user = Base64.getDecoder().decode(userName); + WorkerTokenInfo deser = Utils.deserialize(user, WorkerTokenInfo.class); + return deser.get_userName(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenClientCallbackHandler.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenClientCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenClientCallbackHandler.java new file mode 100644 index 0000000..8bdc1be --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenClientCallbackHandler.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.security.auth.workertoken; + +import java.security.AccessController; +import java.util.Base64; +import javax.security.auth.Subject; +import org.apache.storm.generated.WorkerToken; +import org.apache.storm.generated.WorkerTokenServiceType; +import org.apache.storm.security.auth.AuthUtils; +import org.apache.storm.security.auth.ThriftConnectionType; +import org.apache.storm.security.auth.sasl.SimpleSaslClientCallbackHandler; + +/** + * A Client callback handler for a WorkerToken. In general a client that wants to + * support worker tokens should first check if a WorkerToken is available for the + * specific connection type by calling findWorkerTokenInSubject. If that returns + * a token, then proceed to create and use this with a DIGEST-MD5 SaslClient. + * If not you should fall back to whatever other client auth you want to do. + */ +public class WorkerTokenClientCallbackHandler extends SimpleSaslClientCallbackHandler { + + /** + * Look in the current subject for a WorkerToken. This should really only happen + * when we are in a worker, because the tokens will not be placed in anything else. + * @param type the type of connection we need a token for. + * @return the found token or null. + */ + public static WorkerToken findWorkerTokenInSubject(ThriftConnectionType type) { + WorkerTokenServiceType serviceType = type.getWtType(); + WorkerToken ret = null; + if (serviceType != null) { + Subject subject = Subject.getSubject(AccessController.getContext()); + if (subject != null) { + ret = AuthUtils.findWorkerToken(subject, serviceType); + } + } + return ret; + } + + /** + * Constructor. + * @param token the token to use to authenticate. This was probably retrieved by calling findWorkerTokenInSubject. + */ + public WorkerTokenClientCallbackHandler(WorkerToken token) { + super(Base64.getEncoder().encodeToString(token.get_info()), + Base64.getEncoder().encodeToString(token.get_signature())); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenSigner.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenSigner.java b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenSigner.java new file mode 100644 index 0000000..46d3c07 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenSigner.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.security.auth.workertoken; + +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import javax.crypto.Mac; +import javax.crypto.SecretKey; + +/** + * Provides everything needed to sign a worker token with a secret key. + */ +class WorkerTokenSigner { + /** + * The name of the hashing algorithm. + */ + static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA256"; + + /** + * A thread local store for the Macs. + */ + private static final ThreadLocal<Mac> threadLocalMac = + ThreadLocal.withInitial(() -> { + try { + return Mac.getInstance(DEFAULT_HMAC_ALGORITHM); + } catch (NoSuchAlgorithmException nsa) { + throw new IllegalArgumentException("Can't find " + DEFAULT_HMAC_ALGORITHM + " algorithm."); + } + }); + + /** + * Compute HMAC of the identifier using the secret key and return the + * output as password. + * @param identifier the bytes of the identifier + * @param key the secret key + * @return the bytes of the generated password + */ + static byte[] createPassword(byte[] identifier, SecretKey key) { + Mac mac = threadLocalMac.get(); + try { + mac.init(key); + } catch (InvalidKeyException ike) { + throw new IllegalArgumentException("Invalid key to HMAC computation", ike); + } + return mac.doFinal(identifier); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java b/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java index 07fece1..f50947a 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java +++ b/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java @@ -76,6 +76,18 @@ public class ObjectReader { throw new IllegalArgumentException("Don't know how to convert " + o + " to int"); } + public static Long getLong(Object o, Long defaultValue) { + if (null == o) { + return defaultValue; + } + if (o instanceof Number) { + return ((Number)o).longValue(); + } else if (o instanceof String) { + return Long.valueOf((String) o); + } + throw new IllegalArgumentException("Don't know how to convert " + o + " to a long"); + } + public static Double getDouble(Object o) { Double result = getDouble(o, null); if (null == result) { http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/utils/Utils.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java index 73cfc81..e20e2ee 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java @@ -42,6 +42,7 @@ import java.nio.ByteBuffer; import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.Enumeration; @@ -81,6 +82,7 @@ import org.apache.storm.generated.Nimbus; import org.apache.storm.generated.StormTopology; import org.apache.storm.generated.TopologyInfo; import org.apache.storm.generated.TopologySummary; +import org.apache.storm.generated.WorkerToken; import org.apache.storm.security.auth.ReqContext; import org.apache.storm.serialization.DefaultSerializationDelegate; import org.apache.storm.serialization.SerializationDelegate; @@ -524,6 +526,14 @@ public class Utils { return ret.toString(); } + public static Id parseZkId(String id, String configName) { + String[] split = id.split(":", 2); + if (split.length != 2) { + throw new IllegalArgumentException(configName + " does not appear to be in the form scheme:acl, i.e. sasl:storm-user"); + } + return new Id(split[0], split[1]); + } + public static List<ACL> getWorkerACL(Map<String, Object> conf) { //This is a work around to an issue with ZK where a sasl super user is not super unless there is an open SASL ACL so we are trying to give the correct perms if (!isZkAuthenticationConfiguredTopology(conf)) { @@ -533,12 +543,8 @@ public class Utils { if (stormZKUser == null) { throw new IllegalArgumentException("Authentication is enabled but " + Config.STORM_ZOOKEEPER_SUPERACL + " is not set"); } - String[] split = stormZKUser.split(":", 2); - if (split.length != 2) { - throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL + " does not appear to be in the form scheme:acl, i.e. sasl:storm-user"); - } - ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL); - ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1]))); + ArrayList<ACL> ret = new ArrayList<>(ZooDefs.Ids.CREATOR_ALL_ACL); + ret.add(new ACL(ZooDefs.Perms.ALL, parseZkId(stormZKUser, Config.STORM_ZOOKEEPER_SUPERACL))); return ret; } @@ -693,6 +699,27 @@ public class Utils { return serializationDelegate.deserialize(serialized, clazz); } + /** + * Serialize an object using the configured serialization and then base64 encode it into a string. + * @param obj the object to encode + * @return a string with the encoded object in it. + */ + public static String serializeToString(Object obj) { + return Base64.getEncoder().encodeToString(serializationDelegate.serialize(obj)); + } + + /** + * Deserialize an object stored in a string. The String is assumed to be a base64 encoded string + * containing the bytes to actually deserialize. + * @param str the encoded string. + * @param clazz the thrift class we are expecting. + * @param <T> The type of clazz + * @return the decoded object + */ + public static <T> T deserializeFromString(String str, Class<T> clazz) { + return deserialize(Base64.getDecoder().decode(str), clazz); + } + public static byte[] toByteArray(ByteBuffer buffer) { byte[] ret = new byte[buffer.remaining()]; buffer.get(ret, 0, ret.length);