http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/IAuthorizer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/IAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/IAuthorizer.java index d592bb7..7ed498b 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/IAuthorizer.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/IAuthorizer.java @@ -20,27 +20,27 @@ package backtype.storm.security.auth; import java.util.Map; /** - * Nimbus could be configured with an authorization plugin. - * If not specified, all requests are authorized. + * Nimbus could be configured with an authorization plugin. If not specified, all requests are authorized. * - * You could specify the authorization plugin via storm parameter. For example: - * storm -c nimbus.authorization.class=backtype.storm.security.auth.NoopAuthorizer ... - * - * You could also specify it via storm.yaml: - * nimbus.authorization.class: backtype.storm.security.auth.NoopAuthorizer + * You could specify the authorization plugin via storm parameter. For example: storm -c nimbus.authorization.class=backtype.storm.security.auth.NoopAuthorizer + * ... + * + * You could also specify it via storm.yaml: nimbus.authorization.class: backtype.storm.security.auth.NoopAuthorizer */ public interface IAuthorizer { /** * Invoked once immediately after construction - * @param conf Storm configuration + * + * @param conf Storm configuration */ void prepare(Map storm_conf); - + /** * permit() method is invoked for each incoming Thrift request. - * @param context request context includes info about + * + * @param context request context includes info about * @param operation operation name - * @param topology_storm configuration of targeted topology + * @param topology_storm configuration of targeted topology * @return true if the request is authorized, false if reject */ public boolean permit(ReqContext context, String operation, Map topology_conf);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/IAutoCredentials.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/IAutoCredentials.java b/jstorm-core/src/main/java/backtype/storm/security/auth/IAutoCredentials.java index b3886da..16841d5 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/IAutoCredentials.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/IAutoCredentials.java @@ -23,8 +23,7 @@ import java.util.Map; import javax.security.auth.Subject; /** - * Provides a way to automatically push credentials to a topology and to - * retreave them in the worker. + * Provides a way to automatically push credentials to a topology and to retreave them in the worker. */ public interface IAutoCredentials { @@ -32,24 +31,26 @@ public interface IAutoCredentials { /** * Called to populate the credentials on the client side. + * * @param credentials the credentials to be populated. */ public void populateCredentials(Map<String, String> credentials); /** * Called to initially populate the subject on the worker side with credentials passed in. + * * @param subject the subject to optionally put credentials in. * @param credentials the credentials to be used. - */ + */ public void populateSubject(Subject subject, Map<String, String> credentials); - /** - * Called to update the subject on the worker side when new credentials are recieved. - * This means that populateSubject has already been called on this subject. + * Called to update the subject on the worker side when new credentials are recieved. This means that populateSubject has already been called on this + * subject. + * * @param subject the subject to optionally put credentials in. * @param credentials the credentials to be used. - */ + */ public void updateSubject(Subject subject, Map<String, String> credentials); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ICredentialsRenewer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ICredentialsRenewer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ICredentialsRenewer.java index 3eaf6c4..34358f4 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/ICredentialsRenewer.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ICredentialsRenewer.java @@ -26,16 +26,18 @@ import java.util.Map; */ public interface ICredentialsRenewer { - /** - * Called when initializing the service. - * @param conf the storm cluster configuration. - */ - public void prepare(Map conf); + /** + * Called when initializing the service. + * + * @param conf the storm cluster configuration. + */ + public void prepare(Map conf); /** * Renew any credentials that need to be renewed. (Update the credentials if needed) + * * @param credentials the credentials that may have something to renew. * @param topologyConf topology configuration. - */ + */ public void renew(Map<String, String> credentials, Map topologyConf); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/IGroupMappingServiceProvider.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/IGroupMappingServiceProvider.java b/jstorm-core/src/main/java/backtype/storm/security/auth/IGroupMappingServiceProvider.java index 5590b81..865e950 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/IGroupMappingServiceProvider.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/IGroupMappingServiceProvider.java @@ -26,13 +26,14 @@ public interface IGroupMappingServiceProvider { /** * Invoked once immediately after construction + * * @param storm_conf Storm configuration */ void prepare(Map storm_conf); /** - * Get all various group memberships of a given user. - * Returns EMPTY list in case of non-existing user + * Get all various group memberships of a given user. Returns EMPTY list in case of non-existing user + * * @param user User's name * @return group memberships of user * @throws IOException http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/IHttpCredentialsPlugin.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/IHttpCredentialsPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/IHttpCredentialsPlugin.java index a012ce4..66dfcee 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/IHttpCredentialsPlugin.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/IHttpCredentialsPlugin.java @@ -29,20 +29,22 @@ import backtype.storm.security.auth.ReqContext; public interface IHttpCredentialsPlugin { /** * Invoked once immediately after construction + * * @param storm_conf Storm configuration */ void prepare(Map storm_conf); /** * Gets the user name from the request. + * * @param req the servlet request * @return the authenticated user, or null if none is authenticated. */ String getUserName(HttpServletRequest req); /** - * Populates a given context with credentials information from an HTTP - * request. + * Populates a given context with credentials information from an HTTP request. + * * @param req the servlet request * @return the context */ http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/IPrincipalToLocal.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/IPrincipalToLocal.java b/jstorm-core/src/main/java/backtype/storm/security/auth/IPrincipalToLocal.java index fca3d37..32b4564 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/IPrincipalToLocal.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/IPrincipalToLocal.java @@ -22,18 +22,19 @@ import java.util.Map; import java.security.Principal; /** - * Storm can be configured to launch worker processed as a given user. - * Some transports need to map the Principal to a local user name. + * Storm can be configured to launch worker processed as a given user. Some transports need to map the Principal to a local user name. */ public interface IPrincipalToLocal { /** * Invoked once immediately after construction - * @param conf Storm configuration + * + * @param conf Storm configuration */ void prepare(Map storm_conf); - + /** * Convert a Principal to a local user name. + * * @param principal the principal to convert * @return The local user name. */ http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ITransportPlugin.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ITransportPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ITransportPlugin.java index 5ba2557..c3c657f 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/ITransportPlugin.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ITransportPlugin.java @@ -37,25 +37,28 @@ import backtype.storm.security.auth.ThriftConnectionType; public interface ITransportPlugin { /** * Invoked once immediately after construction + * * @param type the type of connection this will process. - * @param storm_conf Storm configuration + * @param storm_conf Storm configuration * @param login_conf login configuration */ void prepare(ThriftConnectionType type, Map storm_conf, Configuration login_conf); - + /** * Create a server associated with a given port, service handler, and purpose + * * @param processor service handler * @return server */ public TServer getServer(TProcessor processor) throws IOException, TTransportException; /** - * Connect to the specified server via framed transport + * Connect to the specified server via framed transport + * * @param transport The underlying Thrift transport. * @param serverHost server host - * @param asUser the user as which the connection should be established, and all the subsequent actions should be executed. - * Only applicable when using secure storm cluster. A null/blank value here will just indicate to use the logged in user. + * @param asUser the user as which the connection should be established, and all the subsequent actions should be executed. Only applicable when using + * secure storm cluster. A null/blank value here will just indicate to use the logged in user. */ public TTransport connect(TTransport transport, String serverHost, String asUser) throws IOException, TTransportException; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/KerberosPrincipalToLocal.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/KerberosPrincipalToLocal.java b/jstorm-core/src/main/java/backtype/storm/security/auth/KerberosPrincipalToLocal.java index 35c7788..7ac6a6d 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/KerberosPrincipalToLocal.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/KerberosPrincipalToLocal.java @@ -28,18 +28,21 @@ public class KerberosPrincipalToLocal implements IPrincipalToLocal { /** * Invoked once immediately after construction - * @param conf Storm configuration + * + * @param conf Storm configuration */ - public void prepare(Map storm_conf) {} - + public void prepare(Map storm_conf) { + } + /** * Convert a Principal to a local user name. + * * @param principal the principal to convert * @return The local user name. */ public String toLocal(Principal principal) { - //This technically does not conform with rfc1964, but should work so - // long as you don't have any really odd names in your KDC. - return principal == null ? null : principal.getName().split("[/@]")[0]; + // This technically does not conform with rfc1964, but should work so + // long as you don't have any really odd names in your KDC. + return principal == null ? null : principal.getName().split("[/@]")[0]; } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ReqContext.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ReqContext.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ReqContext.java index a252f85..47f317c 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/ReqContext.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ReqContext.java @@ -31,10 +31,7 @@ import java.security.Principal; import javax.security.auth.Subject; /** - * context request context includes info about - * (1) remote address, - * (2) remote subject and primary principal - * (3) request ID + * context request context includes info about (1) remote address, (2) remote subject and primary principal (3) request ID */ public class ReqContext { private static final AtomicInteger uniqueId = new AtomicInteger(0); @@ -46,39 +43,37 @@ public class ReqContext { private static final Logger LOG = LoggerFactory.getLogger(ReqContext.class); - /** * Get a request context associated with current thread + * * @return */ public static ReqContext context() { return ctxt.get(); } - //each thread will have its own request context - private static final ThreadLocal < ReqContext > ctxt = - new ThreadLocal < ReqContext > () { - @Override + // each thread will have its own request context + private static final ThreadLocal<ReqContext> ctxt = new ThreadLocal<ReqContext>() { + @Override protected ReqContext initialValue() { return new ReqContext(AccessController.getContext()); } }; - //private constructor + // private constructor @VisibleForTesting public ReqContext(AccessControlContext acl_ctxt) { _subject = Subject.getSubject(acl_ctxt); _reqID = uniqueId.incrementAndGet(); } - //private constructor + // private constructor @VisibleForTesting public ReqContext(Subject sub) { _subject = sub; _reqID = uniqueId.incrementAndGet(); } - /** * client address */ @@ -108,15 +103,18 @@ public class ReqContext { * The primary principal associated current subject */ public Principal principal() { - if (_subject == null) return null; + if (_subject == null) + return null; Set<Principal> princs = _subject.getPrincipals(); - if (princs.size()==0) return null; + if (princs.size() == 0) + return null; return (Principal) (princs.toArray()[0]); } public void setRealPrincipal(Principal realPrincipal) { this.realPrincipal = realPrincipal; } + /** * The real principal associated with the subject. */ @@ -126,12 +124,13 @@ public class ReqContext { /** * Returns true if this request is an impersonation request. + * * @return */ public boolean isImpersonating() { return this.realPrincipal != null; } - + /** * request ID of this request */ http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/SaslTransportPlugin.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/SaslTransportPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/SaslTransportPlugin.java index 7208a17..9c8780d 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/SaslTransportPlugin.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/SaslTransportPlugin.java @@ -73,11 +73,9 @@ public abstract class SaslTransportPlugin implements ITransportPlugin { int numWorkerThreads = type.getNumThreads(storm_conf); Integer queueSize = type.getQueueSize(storm_conf); - TThreadPoolServer.Args server_args = new TThreadPoolServer.Args(serverTransport). - processor(new TUGIWrapProcessor(processor)). - minWorkerThreads(numWorkerThreads). - maxWorkerThreads(numWorkerThreads). - protocolFactory(new TBinaryProtocol.Factory(false, true)); + TThreadPoolServer.Args server_args = + new TThreadPoolServer.Args(serverTransport).processor(new TUGIWrapProcessor(processor)).minWorkerThreads(numWorkerThreads) + .maxWorkerThreads(numWorkerThreads).protocolFactory(new TBinaryProtocol.Factory(false, true)); if (serverTransportFactory != null) { server_args.transportFactory(serverTransportFactory); @@ -86,26 +84,23 @@ public abstract class SaslTransportPlugin implements ITransportPlugin { if (queueSize != null) { workQueue = new ArrayBlockingQueue(queueSize); } - ThreadPoolExecutor executorService = new ExtendedThreadPoolExecutor(numWorkerThreads, numWorkerThreads, - 60, TimeUnit.SECONDS, workQueue); + ThreadPoolExecutor executorService = new ExtendedThreadPoolExecutor(numWorkerThreads, numWorkerThreads, 60, TimeUnit.SECONDS, workQueue); server_args.executorService(executorService); return new TThreadPoolServer(server_args); } /** * All subclass must implement this method + * * @return * @throws IOException */ protected abstract TTransportFactory getServerTransportFactory() throws IOException; - - /** - * Processor that pulls the SaslServer object out of the transport, and - * assumes the remote user's UGI before calling through to the original - * processor. - * - * This is used on the server side to set the UGI for each specific call. + /** + * Processor that pulls the SaslServer object out of the transport, and assumes the remote user's UGI before calling through to the original processor. + * + * This is used on the server side to set the UGI for each specific call. */ private class TUGIWrapProcessor implements TProcessor { final TProcessor wrapped; @@ -115,25 +110,25 @@ public abstract class SaslTransportPlugin implements ITransportPlugin { } public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException { - //populating request context + // populating request context ReqContext req_context = ReqContext.context(); TTransport trans = inProt.getTransport(); - //Sasl transport - TSaslServerTransport saslTrans = (TSaslServerTransport)trans; - //remote address - TSocket tsocket = (TSocket)saslTrans.getUnderlyingTransport(); + // Sasl transport + TSaslServerTransport saslTrans = (TSaslServerTransport) trans; + // remote address + TSocket tsocket = (TSocket) saslTrans.getUnderlyingTransport(); Socket socket = tsocket.getSocket(); req_context.setRemoteAddress(socket.getInetAddress()); - //remote subject + // remote subject SaslServer saslServer = saslTrans.getSaslServer(); String authId = saslServer.getAuthorizationID(); Subject remoteUser = new Subject(); remoteUser.getPrincipals().add(new User(authId)); req_context.setSubject(remoteUser); - //invoke service handler + // invoke service handler return wrapped.process(inProt, outProt); } } @@ -142,11 +137,11 @@ public abstract class SaslTransportPlugin implements ITransportPlugin { private final String name; public User(String name) { - this.name = name; + this.name = name; } - /** - * Get the full name of the user. + /** + * Get the full name of the user. */ public String getName() { return name; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ShellBasedGroupsMapping.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ShellBasedGroupsMapping.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ShellBasedGroupsMapping.java index 62a4c7e..16f2fe4 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/ShellBasedGroupsMapping.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ShellBasedGroupsMapping.java @@ -31,15 +31,14 @@ import backtype.storm.utils.ShellUtils.ExitCodeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -public class ShellBasedGroupsMapping implements - IGroupMappingServiceProvider { +public class ShellBasedGroupsMapping implements IGroupMappingServiceProvider { public static Logger LOG = LoggerFactory.getLogger(ShellBasedGroupsMapping.class); public TimeCacheMap<String, Set<String>> cachedGroups; /** * Invoked once immediately after construction + * * @param storm_conf Storm configuration */ @Override @@ -50,24 +49,24 @@ public class ShellBasedGroupsMapping implements /** * Returns list of groups for a user - * + * * @param user get groups for this user * @return list of groups for a given user */ @Override public Set<String> getGroups(String user) throws IOException { - if(cachedGroups.containsKey(user)) { + if (cachedGroups.containsKey(user)) { return cachedGroups.get(user); } Set<String> groups = getUnixGroups(user); - if(!groups.isEmpty()) - cachedGroups.put(user,groups); + if (!groups.isEmpty()) + cachedGroups.put(user, groups); return groups; } /** - * Get the current user's group list from Unix by running the command 'groups' - * NOTE. For non-existing user it will return EMPTY list + * Get the current user's group list from Unix by running the command 'groups' NOTE. For non-existing user it will return EMPTY list + * * @param user user name * @return the groups set that the <code>user</code> belongs to * @throws IOException if encounter any error when running the command @@ -82,8 +81,7 @@ public class ShellBasedGroupsMapping implements return new HashSet<String>(); } - StringTokenizer tokenizer = - new StringTokenizer(result, ShellUtils.TOKEN_SEPARATOR_REGEX); + StringTokenizer tokenizer = new StringTokenizer(result, ShellUtils.TOKEN_SEPARATOR_REGEX); Set<String> groups = new HashSet<String>(); while (tokenizer.hasMoreTokens()) { groups.add(tokenizer.nextToken()); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/SimpleTransportPlugin.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/SimpleTransportPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/SimpleTransportPlugin.java index 2abcdae..c7e816f 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/SimpleTransportPlugin.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/SimpleTransportPlugin.java @@ -73,22 +73,21 @@ public class SimpleTransportPlugin implements ITransportPlugin { int maxBufferSize = type.getMaxBufferSize(storm_conf); Integer queueSize = type.getQueueSize(storm_conf); - THsHaServer.Args server_args = new THsHaServer.Args(serverTransport). - processor(new SimpleWrapProcessor(processor)). - workerThreads(numWorkerThreads). - protocolFactory(new TBinaryProtocol.Factory(false, true, maxBufferSize, -1)); + THsHaServer.Args server_args = + new THsHaServer.Args(serverTransport).processor(new SimpleWrapProcessor(processor)).workerThreads(numWorkerThreads) + .protocolFactory(new TBinaryProtocol.Factory(false, true, maxBufferSize, -1)); if (queueSize != null) { - server_args.executorService(new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads, - 60, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize))); + server_args.executorService(new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads, 60, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize))); } - //construct THsHaServer + // construct THsHaServer return new THsHaServer(server_args); } /** - * Connect to the specified server via framed transport + * Connect to the specified server via framed transport + * * @param transport The underlying Thrift transport. * @param serverHost unused. * @param asUser unused. @@ -96,10 +95,10 @@ public class SimpleTransportPlugin implements ITransportPlugin { @Override public TTransport connect(TTransport transport, String serverHost, String asUser) throws TTransportException { int maxBufferSize = type.getMaxBufferSize(storm_conf); - //create a framed transport + // create a framed transport TTransport conn = new TFramedTransport(transport, maxBufferSize); - //connect + // connect conn.open(); LOG.debug("Simple client transport has been established"); @@ -108,13 +107,13 @@ public class SimpleTransportPlugin implements ITransportPlugin { /** * @return the subject that will be used for all connections - */ + */ protected Subject getDefaultSubject() { return null; } - /** - * Processor that populate simple transport info into ReqContext, and then invoke a service handler + /** + * Processor that populate simple transport info into ReqContext, and then invoke a service handler */ private class SimpleWrapProcessor implements TProcessor { final TProcessor wrapped; @@ -124,7 +123,7 @@ public class SimpleTransportPlugin implements ITransportPlugin { } public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException { - //populating request context + // populating request context ReqContext req_context = ReqContext.context(); TTransport trans = inProt.getTransport(); @@ -133,31 +132,36 @@ public class SimpleTransportPlugin implements ITransportPlugin { req_context.setRemoteAddress(InetAddress.getLocalHost()); } catch (UnknownHostException e) { throw new RuntimeException(e); - } + } } else if (trans instanceof TSocket) { - TSocket tsocket = (TSocket)trans; - //remote address + TSocket tsocket = (TSocket) trans; + // remote address Socket socket = tsocket.getSocket(); - req_context.setRemoteAddress(socket.getInetAddress()); - } + req_context.setRemoteAddress(socket.getInetAddress()); + } - //anonymous user + // anonymous user Subject s = getDefaultSubject(); if (s == null) { - final String user = (String)storm_conf.get("debug.simple.transport.user"); - if (user != null) { - HashSet<Principal> principals = new HashSet<Principal>(); - principals.add(new Principal() { - public String getName() { return user; } - public String toString() { return user; } - }); - s = new Subject(true, principals, new HashSet<Object>(), new HashSet<Object>()); - } + final String user = (String) storm_conf.get("debug.simple.transport.user"); + if (user != null) { + HashSet<Principal> principals = new HashSet<Principal>(); + principals.add(new Principal() { + public String getName() { + return user; + } + + public String toString() { + return user; + } + }); + s = new Subject(true, principals, new HashSet<Object>(), new HashSet<Object>()); + } } req_context.setSubject(s); - //invoke service handler + // invoke service handler return wrapped.process(inProt, outProt); } - } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/SingleUserPrincipal.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/SingleUserPrincipal.java b/jstorm-core/src/main/java/backtype/storm/security/auth/SingleUserPrincipal.java index 6af17fa..fd9e694 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/SingleUserPrincipal.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/SingleUserPrincipal.java @@ -34,7 +34,7 @@ public class SingleUserPrincipal implements Principal { @Override public boolean equals(Object another) { if (another instanceof SingleUserPrincipal) { - return _userName.equals(((SingleUserPrincipal)another)._userName); + return _userName.equals(((SingleUserPrincipal) another)._userName); } return false; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/TBackoffConnect.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/TBackoffConnect.java b/jstorm-core/src/main/java/backtype/storm/security/auth/TBackoffConnect.java index f547868..b699bc4 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/TBackoffConnect.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/TBackoffConnect.java @@ -35,15 +35,13 @@ public class TBackoffConnect { public TBackoffConnect(int retryTimes, int retryInterval, int retryIntervalCeiling) { _retryTimes = retryTimes; - waitGrabber = new StormBoundedExponentialBackoffRetry(retryInterval, - retryIntervalCeiling, - retryTimes); + waitGrabber = new StormBoundedExponentialBackoffRetry(retryInterval, retryIntervalCeiling, retryTimes); } public TTransport doConnectWithRetry(ITransportPlugin transportPlugin, TTransport underlyingTransport, String host, String asUser) throws IOException { boolean connected = false; TTransport transportResult = null; - while(!connected) { + while (!connected) { try { transportResult = transportPlugin.connect(underlyingTransport, host, asUser); connected = true; @@ -55,13 +53,13 @@ public class TBackoffConnect { } private void retryNext(TTransportException ex) { - if(!canRetry()) { + if (!canRetry()) { throw new RuntimeException(ex); } try { int sleeptime = waitGrabber.getSleepTimeMs(_completedRetries, 0); - LOG.debug("Failed to connect. Retrying... (" + Integer.toString( _completedRetries) + ") in " + Integer.toString(sleeptime) + "ms"); + LOG.debug("Failed to connect. Retrying... (" + Integer.toString(_completedRetries) + ") in " + Integer.toString(sleeptime) + "ms"); Thread.sleep(sleeptime); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftClient.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftClient.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftClient.java index 8d2136a..954b4f8 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftClient.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftClient.java @@ -45,17 +45,17 @@ public class ThriftClient { private String hostPort; private String host; private Integer port; - + private Map<Object, Object> conf; - + private Integer timeout; private ThriftConnectionType type; private String asUser; - + public ThriftClient(Map conf, ThriftConnectionType type) throws Exception { this(conf, type, null, null, null, null); } - + @SuppressWarnings("unchecked") public ThriftClient(Map conf, ThriftConnectionType type, Integer timeout) throws Exception { this(conf, type, null, null, timeout, null); @@ -63,6 +63,7 @@ public class ThriftClient { /** * This is only for be compatible for Storm + * * @param conf * @param type * @param host @@ -71,45 +72,39 @@ public class ThriftClient { this(conf, type, host, null, null, null); } - public ThriftClient(Map conf, ThriftConnectionType type, String host, Integer port, Integer timeout){ + public ThriftClient(Map conf, ThriftConnectionType type, String host, Integer port, Integer timeout) { this(conf, type, host, port, timeout, null); } public ThriftClient(Map conf, ThriftConnectionType type, String host, Integer port, Integer timeout, String asUser) { - //create a socket with server - + // create a socket with server + this.timeout = timeout; this.conf = conf; this.type = type; this.asUser = asUser; - + getMaster(conf, host, port); reconnect(); } - - - + public static String getMasterByZk(Map conf) throws Exception { - CuratorFramework zkobj = null; String masterHost = null; - + try { String root = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)); String zkMasterDir = root + Cluster.MASTER_SUBTREE; - - zkobj = Utils.newCurator(conf, - (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), - conf.get(Config.STORM_ZOOKEEPER_PORT), - zkMasterDir); + + zkobj = Utils.newCurator(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT), zkMasterDir); zkobj.start(); if (zkobj.checkExists().forPath("/") == null) { throw new RuntimeException("No alive nimbus "); } - + masterHost = new String(zkobj.getData().forPath("/")); - + LOG.info("masterHost:" + masterHost); return masterHost; } finally { @@ -119,8 +114,8 @@ public class ThriftClient { } } } - - public void getMaster(Map conf, String host, Integer port){ + + public void getMaster(Map conf, String host, Integer port) { if (StringUtils.isBlank(host) == false) { this.host = host; if (port == null) { @@ -128,7 +123,7 @@ public class ThriftClient { } this.port = port; this.hostPort = host + ":" + port; - }else { + } else { try { hostPort = ThriftClient.getMasterByZk(conf); } catch (Exception e) { @@ -142,7 +137,7 @@ public class ThriftClient { this.host = host_port[0]; this.port = Integer.parseInt(host_port[1]); } - + // create a socket with server if (this.host == null) { throw new IllegalArgumentException("host is not set"); @@ -151,45 +146,43 @@ public class ThriftClient { throw new IllegalArgumentException("invalid port: " + port); } } - + public synchronized TTransport transport() { return _transport; } - + public synchronized void reconnect() { - close(); + close(); try { TSocket socket = new TSocket(host, port); - if(timeout!=null) { + if (timeout != null) { socket.setTimeout(timeout); - }else { - //@@@ Todo + } else { + // @@@ Todo // set the socket default Timeout as xxxx } - //locate login configuration + // locate login configuration Configuration login_conf = AuthUtils.GetConfiguration(conf); - //construct a transport plugin + // construct a transport plugin ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(type, conf, login_conf); final TTransport underlyingTransport = socket; - //TODO get this from type instead of hardcoding to Nimbus. - //establish client-server transport via plugin - //do retries if the connect fails - TBackoffConnect connectionRetry - = new TBackoffConnect( - Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_TIMES)), - Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL)), - Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING))); + // TODO get this from type instead of hardcoding to Nimbus. + // establish client-server transport via plugin + // do retries if the connect fails + TBackoffConnect connectionRetry = + new TBackoffConnect(Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_TIMES)), Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL)), + Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING))); _transport = connectionRetry.doConnectWithRetry(transportPlugin, underlyingTransport, host, asUser); } catch (IOException ex) { throw new RuntimeException(ex); } _protocol = null; if (_transport != null) { - _protocol = new TBinaryProtocol(_transport); + _protocol = new TBinaryProtocol(_transport); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftConnectionType.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftConnectionType.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftConnectionType.java index f9be7ae..e248df8 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftConnectionType.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftConnectionType.java @@ -26,12 +26,9 @@ import java.util.Map; * The purpose for which the Thrift server is created. */ public enum ThriftConnectionType { - NIMBUS(Config.NIMBUS_THRIFT_TRANSPORT_PLUGIN, Config.NIMBUS_THRIFT_PORT, null, - Config.NIMBUS_THRIFT_THREADS, Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE), - DRPC(Config.DRPC_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_PORT, Config.DRPC_QUEUE_SIZE, - Config.DRPC_WORKER_THREADS, Config.DRPC_MAX_BUFFER_SIZE), - DRPC_INVOCATIONS(Config.DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_INVOCATIONS_PORT, null, - Config.DRPC_INVOCATIONS_THREADS, Config.DRPC_MAX_BUFFER_SIZE); + NIMBUS(Config.NIMBUS_THRIFT_TRANSPORT_PLUGIN, Config.NIMBUS_THRIFT_PORT, null, Config.NIMBUS_THRIFT_THREADS, Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE), DRPC( + Config.DRPC_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_PORT, Config.DRPC_QUEUE_SIZE, Config.DRPC_WORKER_THREADS, Config.DRPC_MAX_BUFFER_SIZE), DRPC_INVOCATIONS( + Config.DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_INVOCATIONS_PORT, null, Config.DRPC_INVOCATIONS_THREADS, Config.DRPC_MAX_BUFFER_SIZE); private final String _transConf; private final String _portConf; @@ -39,8 +36,7 @@ public enum ThriftConnectionType { private final String _threadsConf; private final String _buffConf; - ThriftConnectionType(String transConf, String portConf, String qConf, - String threadsConf, String buffConf) { + ThriftConnectionType(String transConf, String portConf, String qConf, String threadsConf, String buffConf) { _transConf = transConf; _portConf = portConf; _qConf = qConf; @@ -49,9 +45,9 @@ public enum ThriftConnectionType { } public String getTransportPlugin(Map conf) { - String ret = (String)conf.get(_transConf); + String ret = (String) conf.get(_transConf); if (ret == null) { - ret = (String)conf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN); + ret = (String) conf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN); } return ret; } @@ -64,10 +60,10 @@ public enum ThriftConnectionType { if (_qConf == null) { return null; } - return (Integer)conf.get(_qConf); + return (Integer) conf.get(_qConf); } - public int getNumThreads(Map conf) { + public int getNumThreads(Map conf) { return Utils.getInt(conf.get(_threadsConf)); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftServer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftServer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftServer.java index 64243ce..410f1ce 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftServer.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftServer.java @@ -28,19 +28,19 @@ import org.slf4j.LoggerFactory; public class ThriftServer { private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class); - private Map _storm_conf; //storm configuration + private Map _storm_conf; // storm configuration protected TProcessor _processor = null; private final ThriftConnectionType _type; private TServer _server = null; private Configuration _login_conf; - + public ThriftServer(Map storm_conf, TProcessor processor, ThriftConnectionType type) { _storm_conf = storm_conf; _processor = processor; _type = type; try { - //retrieve authentication configuration + // retrieve authentication configuration _login_conf = AuthUtils.GetConfiguration(_storm_conf); } catch (Exception x) { LOG.error(x.getMessage(), x); @@ -54,27 +54,30 @@ public class ThriftServer { /** * Is ThriftServer listening to requests? + * * @return */ public boolean isServing() { - if (_server == null) return false; + if (_server == null) + return false; return _server.isServing(); } - - public void serve() { + + public void serve() { try { - //locate our thrift transport plugin - ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(_type, _storm_conf, _login_conf); + // locate our thrift transport plugin + ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(_type, _storm_conf, _login_conf); - //server + // server _server = transportPlugin.getServer(_processor); - //start accepting requests + // start accepting requests _server.serve(); } catch (Exception ex) { LOG.error("ThriftServer is being stopped due to: " + ex, ex); - if (_server != null) _server.stop(); - Runtime.getRuntime().halt(1); //shutdown server process since we could not handle Thrift requests any more + if (_server != null) + _server.stop(); + Runtime.getRuntime().halt(1); // shutdown server process since we could not handle Thrift requests any more } } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCAuthorizerBase.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCAuthorizerBase.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCAuthorizerBase.java index 8951edd..11c4a0f 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCAuthorizerBase.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCAuthorizerBase.java @@ -22,9 +22,10 @@ public abstract class DRPCAuthorizerBase implements IAuthorizer { abstract protected boolean permitClientRequest(ReqContext context, String operation, Map params); abstract protected boolean permitInvocationRequest(ReqContext context, String operation, Map params); - + /** * Authorizes request from to the DRPC server. + * * @param context the client request context * @param operation the operation requested by the DRPC server * @param params a Map with any key-value entries of use to the authorization implementation @@ -33,14 +34,11 @@ public abstract class DRPCAuthorizerBase implements IAuthorizer { public boolean permit(ReqContext context, String operation, Map params) { if ("execute".equals(operation)) { return permitClientRequest(context, operation, params); - } else if ("failRequest".equals(operation) || - "fetchRequest".equals(operation) || - "result".equals(operation)) { + } else if ("failRequest".equals(operation) || "fetchRequest".equals(operation) || "result".equals(operation)) { return permitInvocationRequest(context, operation, params); } // Deny unsupported operations. - LOG.warn("Denying unsupported operation \""+operation+"\" from "+ - context.remoteAddress()); + LOG.warn("Denying unsupported operation \"" + operation + "\" from " + context.remoteAddress()); return false; } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java index 45eaea5..8aa7243 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java @@ -19,8 +19,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase { - public static Logger LOG = - LoggerFactory.getLogger(DRPCSimpleACLAuthorizer.class); + public static Logger LOG = LoggerFactory.getLogger(DRPCSimpleACLAuthorizer.class); public static final String CLIENT_USERS_KEY = "client.users"; public static final String INVOCATION_USER_KEY = "invocation.user"; @@ -33,44 +32,35 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase { protected class AclFunctionEntry { final public Set<String> clientUsers; final public String invocationUser; - public AclFunctionEntry(Collection<String> clientUsers, - String invocationUser) { - this.clientUsers = (clientUsers != null) ? - new HashSet<String>(clientUsers) : new HashSet<String>(); + + public AclFunctionEntry(Collection<String> clientUsers, String invocationUser) { + this.clientUsers = (clientUsers != null) ? new HashSet<String>(clientUsers) : new HashSet<String>(); this.invocationUser = invocationUser; } } - private volatile Map<String,AclFunctionEntry> _acl = null; + private volatile Map<String, AclFunctionEntry> _acl = null; private volatile long _lastUpdate = 0; - protected Map<String,AclFunctionEntry> readAclFromConfig() { - //Thread safety is mostly around _acl. If _acl needs to be updated it is changed atomically - //More then one thread may be trying to update it at a time, but that is OK, because the - //change is atomic + protected Map<String, AclFunctionEntry> readAclFromConfig() { + // Thread safety is mostly around _acl. If _acl needs to be updated it is changed atomically + // More then one thread may be trying to update it at a time, but that is OK, because the + // change is atomic long now = System.currentTimeMillis(); if ((now - 5000) > _lastUpdate || _acl == null) { - Map<String,AclFunctionEntry> acl = new HashMap<String,AclFunctionEntry>(); + Map<String, AclFunctionEntry> acl = new HashMap<String, AclFunctionEntry>(); Map conf = Utils.findAndReadConfigFile(_aclFileName); if (conf.containsKey(Config.DRPC_AUTHORIZER_ACL)) { - Map<String,Map<String,?>> confAcl = - (Map<String,Map<String,?>>) - conf.get(Config.DRPC_AUTHORIZER_ACL); + Map<String, Map<String, ?>> confAcl = (Map<String, Map<String, ?>>) conf.get(Config.DRPC_AUTHORIZER_ACL); for (String function : confAcl.keySet()) { - Map<String,?> val = confAcl.get(function); - Collection<String> clientUsers = - val.containsKey(CLIENT_USERS_KEY) ? - (Collection<String>) val.get(CLIENT_USERS_KEY) : null; - String invocationUser = - val.containsKey(INVOCATION_USER_KEY) ? - (String) val.get(INVOCATION_USER_KEY) : null; - acl.put(function, - new AclFunctionEntry(clientUsers, invocationUser)); + Map<String, ?> val = confAcl.get(function); + Collection<String> clientUsers = val.containsKey(CLIENT_USERS_KEY) ? (Collection<String>) val.get(CLIENT_USERS_KEY) : null; + String invocationUser = val.containsKey(INVOCATION_USER_KEY) ? (String) val.get(INVOCATION_USER_KEY) : null; + acl.put(function, new AclFunctionEntry(clientUsers, invocationUser)); } } else if (!_permitWhenMissingFunctionEntry) { - LOG.warn("Requiring explicit ACL entries, but none given. " + - "Therefore, all operiations will be denied."); + LOG.warn("Requiring explicit ACL entries, but none given. " + "Therefore, all operiations will be denied."); } _acl = acl; _lastUpdate = System.currentTimeMillis(); @@ -80,10 +70,8 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase { @Override public void prepare(Map conf) { - Boolean isStrict = - (Boolean) conf.get(Config.DRPC_AUTHORIZER_ACL_STRICT); - _permitWhenMissingFunctionEntry = - (isStrict != null && !isStrict) ? true : false; + Boolean isStrict = (Boolean) conf.get(Config.DRPC_AUTHORIZER_ACL_STRICT); + _permitWhenMissingFunctionEntry = (isStrict != null && !isStrict) ? true : false; _aclFileName = (String) conf.get(Config.DRPC_AUTHORIZER_ACL_FILENAME); _ptol = AuthUtils.GetPrincipalToLocalPlugin(conf); } @@ -105,11 +93,10 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase { return null; } - protected boolean permitClientOrInvocationRequest(ReqContext context, Map params, - String fieldName) { - Map<String,AclFunctionEntry> acl = readAclFromConfig(); + protected boolean permitClientOrInvocationRequest(ReqContext context, Map params, String fieldName) { + Map<String, AclFunctionEntry> acl = readAclFromConfig(); String function = (String) params.get(FUNCTION_KEY); - if (function != null && ! function.isEmpty()) { + if (function != null && !function.isEmpty()) { AclFunctionEntry entry = acl.get(function); if (entry == null && _permitWhenMissingFunctionEntry) { return true; @@ -126,16 +113,11 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase { String principal = getUserFromContext(context); String user = getLocalUserFromContext(context); if (value == null) { - LOG.warn("Configuration for function '"+function+"' is "+ - "invalid: it should have both an invocation user "+ - "and a list of client users defined."); - } else if (value instanceof Set && - (((Set<String>)value).contains(principal) || - ((Set<String>)value).contains(user))) { + LOG.warn("Configuration for function '" + function + "' is " + "invalid: it should have both an invocation user " + + "and a list of client users defined."); + } else if (value instanceof Set && (((Set<String>) value).contains(principal) || ((Set<String>) value).contains(user))) { return true; - } else if (value instanceof String && - (value.equals(principal) || - value.equals(user))) { + } else if (value instanceof String && (value.equals(principal) || value.equals(user))) { return true; } } @@ -144,14 +126,12 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase { } @Override - protected boolean permitClientRequest(ReqContext context, String operation, - Map params) { + protected boolean permitClientRequest(ReqContext context, String operation, Map params) { return permitClientOrInvocationRequest(context, params, "clientUsers"); } @Override - protected boolean permitInvocationRequest(ReqContext context, String operation, - Map params) { + protected boolean permitInvocationRequest(ReqContext context, String operation, Map params) { return permitClientOrInvocationRequest(context, params, "invocationUser"); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DenyAuthorizer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DenyAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DenyAuthorizer.java index 5e84b38..32f809a 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DenyAuthorizer.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DenyAuthorizer.java @@ -31,27 +31,27 @@ import org.slf4j.LoggerFactory; */ public class DenyAuthorizer implements IAuthorizer { private static final Logger LOG = LoggerFactory.getLogger(DenyAuthorizer.class); - + /** * Invoked once immediately after construction - * @param conf Storm configuration + * + * @param conf Storm configuration */ - public void prepare(Map conf) { + public void prepare(Map conf) { } /** * permit() method is invoked for each incoming Thrift request - * @param contrext request context + * + * @param contrext request context * @param operation operation name - * @param topology_storm configuration of targeted topology + * @param topology_storm configuration of targeted topology * @return true if the request is authorized, false if reject */ public boolean permit(ReqContext context, String operation, Map topology_conf) { - LOG.info("[req "+ context.requestID()+ "] Access " - + " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString()) - + (context.principal() == null? "" : (" principal:"+ context.principal())) - +" op:"+operation - + (topology_conf == null? "" : (" topoology:"+topology_conf.get(Config.TOPOLOGY_NAME)))); + LOG.info("[req " + context.requestID() + "] Access " + " from: " + (context.remoteAddress() == null ? "null" : context.remoteAddress().toString()) + + (context.principal() == null ? "" : (" principal:" + context.principal())) + " op:" + operation + + (topology_conf == null ? "" : (" topoology:" + topology_conf.get(Config.TOPOLOGY_NAME)))); return false; } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java index d6431be..e1a037f 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java @@ -10,7 +10,6 @@ import java.io.IOException; import java.net.InetAddress; import java.util.*; - public class ImpersonationAuthorizer implements IAuthorizer { private static final Logger LOG = LoggerFactory.getLogger(ImpersonationAuthorizer.class); protected static final String WILD_CARD = "*"; @@ -49,16 +48,16 @@ public class ImpersonationAuthorizer implements IAuthorizer { String userBeingImpersonated = _ptol.toLocal(context.principal()); InetAddress remoteAddress = context.remoteAddress(); - LOG.info("user = {}, principal = {} is attmepting to impersonate user = {} for operation = {} from host = {}", - impersonatingUser, impersonatingPrincipal, userBeingImpersonated, operation, remoteAddress); + LOG.info("user = {}, principal = {} is attmepting to impersonate user = {} for operation = {} from host = {}", impersonatingUser, + impersonatingPrincipal, userBeingImpersonated, operation, remoteAddress); /** * no config is present for impersonating principal or user, do not permit impersonation. */ if (!userImpersonationACL.containsKey(impersonatingPrincipal) && !userImpersonationACL.containsKey(impersonatingUser)) { - LOG.info("user = {}, principal = {} is trying to impersonate user {}, but config {} does not have entry for impersonating user or principal." + - "Please see SECURITY.MD to learn how to configure users for impersonation." - , impersonatingUser, impersonatingPrincipal, userBeingImpersonated, Config.NIMBUS_IMPERSONATION_ACL); + LOG.info("user = {}, principal = {} is trying to impersonate user {}, but config {} does not have entry for impersonating user or principal." + + "Please see SECURITY.MD to learn how to configure users for impersonation.", impersonatingUser, impersonatingPrincipal, + userBeingImpersonated, Config.NIMBUS_IMPERSONATION_ACL); return false; } @@ -78,18 +77,17 @@ public class ImpersonationAuthorizer implements IAuthorizer { authorizedGroups.addAll(userACL.authorizedGroups); } - LOG.debug("user = {}, principal = {} is allowed to impersonate groups = {} from hosts = {} ", - impersonatingUser, impersonatingPrincipal, authorizedGroups, authorizedHosts); + LOG.debug("user = {}, principal = {} is allowed to impersonate groups = {} from hosts = {} ", impersonatingUser, impersonatingPrincipal, + authorizedGroups, authorizedHosts); if (!isAllowedToImpersonateFromHost(authorizedHosts, remoteAddress)) { - LOG.info("user = {}, principal = {} is not allowed to impersonate from host {} ", - impersonatingUser, impersonatingPrincipal, remoteAddress); + LOG.info("user = {}, principal = {} is not allowed to impersonate from host {} ", impersonatingUser, impersonatingPrincipal, remoteAddress); return false; } if (!isAllowedToImpersonateUser(authorizedGroups, userBeingImpersonated)) { - LOG.info("user = {}, principal = {} is not allowed to impersonate any group that user {} is part of.", - impersonatingUser, impersonatingPrincipal, userBeingImpersonated); + LOG.info("user = {}, principal = {} is not allowed to impersonate any group that user {} is part of.", impersonatingUser, impersonatingPrincipal, + userBeingImpersonated); return false; } @@ -98,14 +96,12 @@ public class ImpersonationAuthorizer implements IAuthorizer { } private boolean isAllowedToImpersonateFromHost(Set<String> authorizedHosts, InetAddress remoteAddress) { - return authorizedHosts.contains(WILD_CARD) || - authorizedHosts.contains(remoteAddress.getCanonicalHostName()) || - authorizedHosts.contains(remoteAddress.getHostName()) || - authorizedHosts.contains(remoteAddress.getHostAddress()); + return authorizedHosts.contains(WILD_CARD) || authorizedHosts.contains(remoteAddress.getCanonicalHostName()) + || authorizedHosts.contains(remoteAddress.getHostName()) || authorizedHosts.contains(remoteAddress.getHostAddress()); } private boolean isAllowedToImpersonateUser(Set<String> authorizedGroups, String userBeingImpersonated) { - if(authorizedGroups.contains(WILD_CARD)) { + if (authorizedGroups.contains(WILD_CARD)) { return true; } @@ -131,9 +127,9 @@ public class ImpersonationAuthorizer implements IAuthorizer { protected class ImpersonationACL { public String impersonatingUser; - //Groups this user is authorized to impersonate. + // Groups this user is authorized to impersonate. public Set<String> authorizedGroups; - //Hosts this user is authorized to impersonate from. + // Hosts this user is authorized to impersonate from. public Set<String> authorizedHosts; private ImpersonationACL(String impersonatingUser, Set<String> authorizedGroups, Set<String> authorizedHosts) { @@ -144,11 +140,8 @@ public class ImpersonationAuthorizer implements IAuthorizer { @Override public String toString() { - return "ImpersonationACL{" + - "impersonatingUser='" + impersonatingUser + '\'' + - ", authorizedGroups=" + authorizedGroups + - ", authorizedHosts=" + authorizedHosts + - '}'; + return "ImpersonationACL{" + "impersonatingUser='" + impersonatingUser + '\'' + ", authorizedGroups=" + authorizedGroups + ", authorizedHosts=" + + authorizedHosts + '}'; } } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/NoopAuthorizer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/NoopAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/NoopAuthorizer.java index 9af44d3..1d88202 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/NoopAuthorizer.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/NoopAuthorizer.java @@ -34,24 +34,24 @@ public class NoopAuthorizer implements IAuthorizer { /** * Invoked once immediately after construction - * @param conf Storm configuration + * + * @param conf Storm configuration */ - public void prepare(Map conf) { + public void prepare(Map conf) { } /** * permit() method is invoked for each incoming Thrift request - * @param context request context includes info about + * + * @param context request context includes info about * @param operation operation name - * @param topology_storm configuration of targeted topology + * @param topology_storm configuration of targeted topology * @return true if the request is authorized, false if reject */ public boolean permit(ReqContext context, String operation, Map topology_conf) { - LOG.info("[req "+ context.requestID()+ "] Access " - + " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString()) - + (context.principal() == null? "" : (" principal:"+ context.principal())) - +" op:"+operation - + (topology_conf == null? "" : (" topoology:"+topology_conf.get(Config.TOPOLOGY_NAME)))); + LOG.info("[req " + context.requestID() + "] Access " + " from: " + (context.remoteAddress() == null ? "null" : context.remoteAddress().toString()) + + (context.principal() == null ? "" : (" principal:" + context.principal())) + " op:" + operation + + (topology_conf == null ? "" : (" topoology:" + topology_conf.get(Config.TOPOLOGY_NAME)))); return true; } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java index e50a587..40d7a5d 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java @@ -36,15 +36,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * An authorization implementation that simply checks if a user is allowed to perform specific - * operations. + * An authorization implementation that simply checks if a user is allowed to perform specific operations. */ public class SimpleACLAuthorizer implements IAuthorizer { private static final Logger LOG = LoggerFactory.getLogger(SimpleACLAuthorizer.class); protected Set<String> _userCommands = new HashSet<String>(Arrays.asList("submitTopology", "fileUpload", "getNimbusConf", "getClusterInfo")); protected Set<String> _supervisorCommands = new HashSet<String>(Arrays.asList("fileDownload")); - protected Set<String> _topoCommands = new HashSet<String>(Arrays.asList("killTopology","rebalance","activate","deactivate","getTopologyConf","getTopology","getUserTopology","getTopologyInfo","uploadNewCredentials")); + protected Set<String> _topoCommands = new HashSet<String>(Arrays.asList("killTopology", "rebalance", "activate", "deactivate", "getTopologyConf", + "getTopology", "getUserTopology", "getTopologyInfo", "uploadNewCredentials")); protected Set<String> _admins; protected Set<String> _supervisors; @@ -52,8 +52,10 @@ public class SimpleACLAuthorizer implements IAuthorizer { protected Set<String> _nimbusGroups; protected IPrincipalToLocal _ptol; protected IGroupMappingServiceProvider _groupMappingProvider; + /** * Invoked once immediately after construction + * * @param conf Storm configuration */ @Override @@ -64,17 +66,17 @@ public class SimpleACLAuthorizer implements IAuthorizer { _nimbusGroups = new HashSet<String>(); if (conf.containsKey(Config.NIMBUS_ADMINS)) { - _admins.addAll((Collection<String>)conf.get(Config.NIMBUS_ADMINS)); + _admins.addAll((Collection<String>) conf.get(Config.NIMBUS_ADMINS)); } if (conf.containsKey(Config.NIMBUS_SUPERVISOR_USERS)) { - _supervisors.addAll((Collection<String>)conf.get(Config.NIMBUS_SUPERVISOR_USERS)); + _supervisors.addAll((Collection<String>) conf.get(Config.NIMBUS_SUPERVISOR_USERS)); } if (conf.containsKey(Config.NIMBUS_USERS)) { - _nimbusUsers.addAll((Collection<String>)conf.get(Config.NIMBUS_USERS)); + _nimbusUsers.addAll((Collection<String>) conf.get(Config.NIMBUS_USERS)); } if (conf.containsKey(Config.NIMBUS_GROUPS)) { - _nimbusGroups.addAll((Collection<String>)conf.get(Config.NIMBUS_GROUPS)); + _nimbusGroups.addAll((Collection<String>) conf.get(Config.NIMBUS_GROUPS)); } _ptol = AuthUtils.GetPrincipalToLocalPlugin(conf); @@ -83,6 +85,7 @@ public class SimpleACLAuthorizer implements IAuthorizer { /** * permit() method is invoked for each incoming Thrift request + * * @param context request context includes info about * @param operation operation name * @param topology_conf configuration of targeted topology @@ -90,10 +93,8 @@ public class SimpleACLAuthorizer implements IAuthorizer { */ @Override public boolean permit(ReqContext context, String operation, Map topology_conf) { - LOG.info("[req " + context.requestID() + "] Access " - + " from: " + (context.remoteAddress() == null ? "null" : context.remoteAddress().toString()) - + (context.principal() == null ? "" : (" principal:" + context.principal())) - + " op:" + operation + LOG.info("[req " + context.requestID() + "] Access " + " from: " + (context.remoteAddress() == null ? "null" : context.remoteAddress().toString()) + + (context.principal() == null ? "" : (" principal:" + context.principal())) + " op:" + operation + (topology_conf == null ? "" : (" topoology:" + topology_conf.get(Config.TOPOLOGY_NAME)))); String principal = context.principal().getName(); @@ -103,8 +104,8 @@ public class SimpleACLAuthorizer implements IAuthorizer { if (_groupMappingProvider != null) { try { userGroups = _groupMappingProvider.getGroups(user); - } catch(IOException e) { - LOG.warn("Error while trying to fetch user groups",e); + } catch (IOException e) { + LOG.warn("Error while trying to fetch user groups", e); } } @@ -123,7 +124,7 @@ public class SimpleACLAuthorizer implements IAuthorizer { if (_topoCommands.contains(operation)) { Set topoUsers = new HashSet<String>(); if (topology_conf.containsKey(Config.TOPOLOGY_USERS)) { - topoUsers.addAll((Collection<String>)topology_conf.get(Config.TOPOLOGY_USERS)); + topoUsers.addAll((Collection<String>) topology_conf.get(Config.TOPOLOGY_USERS)); } if (topoUsers.contains(principal) || topoUsers.contains(user)) { @@ -132,18 +133,19 @@ public class SimpleACLAuthorizer implements IAuthorizer { Set<String> topoGroups = new HashSet<String>(); if (topology_conf.containsKey(Config.TOPOLOGY_GROUPS) && topology_conf.get(Config.TOPOLOGY_GROUPS) != null) { - topoGroups.addAll((Collection<String>)topology_conf.get(Config.TOPOLOGY_GROUPS)); + topoGroups.addAll((Collection<String>) topology_conf.get(Config.TOPOLOGY_GROUPS)); } - if (checkUserGroupAllowed(userGroups, topoGroups)) return true; + if (checkUserGroupAllowed(userGroups, topoGroups)) + return true; } return false; } private Boolean checkUserGroupAllowed(Set<String> userGroups, Set<String> configuredGroups) { - if(userGroups.size() > 0 && configuredGroups.size() > 0) { + if (userGroups.size() > 0 && configuredGroups.size() > 0) { for (String tgroup : configuredGroups) { - if(userGroups.contains(tgroup)) + if (userGroups.contains(tgroup)) return true; } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java index 55109f9..dbbc945 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java @@ -31,8 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * An authorization implementation that simply checks a whitelist of users that - * are allowed to use the cluster. + * An authorization implementation that simply checks a whitelist of users that are allowed to use the cluster. */ public class SimpleWhitelistAuthorizer implements IAuthorizer { private static final Logger LOG = LoggerFactory.getLogger(SimpleWhitelistAuthorizer.class); @@ -41,30 +40,30 @@ public class SimpleWhitelistAuthorizer implements IAuthorizer { /** * Invoked once immediately after construction - * @param conf Storm configuration + * + * @param conf Storm configuration */ @Override public void prepare(Map conf) { users = new HashSet<String>(); if (conf.containsKey(WHITELIST_USERS_CONF)) { - users.addAll((Collection<String>)conf.get(WHITELIST_USERS_CONF)); + users.addAll((Collection<String>) conf.get(WHITELIST_USERS_CONF)); } } /** * permit() method is invoked for each incoming Thrift request - * @param context request context includes info about + * + * @param context request context includes info about * @param operation operation name - * @param topology_storm configuration of targeted topology + * @param topology_storm configuration of targeted topology * @return true if the request is authorized, false if reject */ @Override public boolean permit(ReqContext context, String operation, Map topology_conf) { - LOG.info("[req "+ context.requestID()+ "] Access " - + " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString()) - + (context.principal() == null? "" : (" principal:"+ context.principal())) - +" op:"+operation - + (topology_conf == null? "" : (" topoology:"+topology_conf.get(Config.TOPOLOGY_NAME)))); + LOG.info("[req " + context.requestID() + "] Access " + " from: " + (context.remoteAddress() == null ? "null" : context.remoteAddress().toString()) + + (context.principal() == null ? "" : (" principal:" + context.principal())) + " op:" + operation + + (topology_conf == null ? "" : (" topoology:" + topology_conf.get(Config.TOPOLOGY_NAME)))); return context.principal() != null ? users.contains(context.principal().getName()) : false; } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ClientCallbackHandler.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ClientCallbackHandler.java b/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ClientCallbackHandler.java index 3caacaa..0e3f626 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ClientCallbackHandler.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ClientCallbackHandler.java @@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory; import backtype.storm.security.auth.AuthUtils; /** - * client side callback handler. + * client side callback handler. */ public class ClientCallbackHandler implements CallbackHandler { private static final String USERNAME = "username"; @@ -51,28 +51,29 @@ public class ClientCallbackHandler implements CallbackHandler { * @throws IOException */ public ClientCallbackHandler(Configuration configuration) throws IOException { - if (configuration == null) return; + if (configuration == null) + return; AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_CLIENT); if (configurationEntries == null) { - String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_CLIENT - + "' entry in this configuration: Client cannot start."; + String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_CLIENT + "' entry in this configuration: Client cannot start."; throw new IOException(errorMessage); } _password = ""; - for(AppConfigurationEntry entry: configurationEntries) { + for (AppConfigurationEntry entry : configurationEntries) { if (entry.getOptions().get(USERNAME) != null) { - _username = (String)entry.getOptions().get(USERNAME); + _username = (String) entry.getOptions().get(USERNAME); } if (entry.getOptions().get(PASSWORD) != null) { - _password = (String)entry.getOptions().get(PASSWORD); + _password = (String) entry.getOptions().get(PASSWORD); } } } /** * This method is invoked by SASL for authentication challenges - * @param callbacks a collection of challenge callbacks + * + * @param callbacks a collection of challenge callbacks */ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { for (Callback c : callbacks) { @@ -82,10 +83,10 @@ public class ClientCallbackHandler implements CallbackHandler { nc.setName(_username); } else if (c instanceof PasswordCallback) { LOG.debug("password callback"); - PasswordCallback pc = (PasswordCallback)c; + PasswordCallback pc = (PasswordCallback) c; if (_password != null) { pc.setPassword(_password.toCharArray()); - } + } } else if (c instanceof AuthorizeCallback) { LOG.debug("authorization callback"); AuthorizeCallback ac = (AuthorizeCallback) c; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java index ad642d8..7b497c6 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java @@ -38,11 +38,11 @@ public class DigestSaslTransportPlugin extends SaslTransportPlugin { public static final String DIGEST = "DIGEST-MD5"; private static final Logger LOG = LoggerFactory.getLogger(DigestSaslTransportPlugin.class); - protected TTransportFactory getServerTransportFactory() throws IOException { - //create an authentication callback handler + protected TTransportFactory getServerTransportFactory() throws IOException { + // create an authentication callback handler CallbackHandler serer_callback_handler = new ServerCallbackHandler(login_conf); - //create a transport factory that will invoke our auth callback for digest + // create a transport factory that will invoke our auth callback for digest TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory(); factory.addServerDefinition(DIGEST, AuthUtils.SERVICE, "localhost", null, serer_callback_handler); @@ -53,13 +53,8 @@ public class DigestSaslTransportPlugin extends SaslTransportPlugin { @Override public TTransport connect(TTransport transport, String serverHost, String asUser) throws TTransportException, IOException { ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf); - TSaslClientTransport wrapper_transport = new TSaslClientTransport(DIGEST, - null, - AuthUtils.SERVICE, - serverHost, - null, - client_callback_handler, - transport); + TSaslClientTransport wrapper_transport = + new TSaslClientTransport(DIGEST, null, AuthUtils.SERVICE, serverHost, null, client_callback_handler, transport); wrapper_transport.open(); LOG.debug("SASL DIGEST-MD5 client transport has been established");
