http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/py/storm/ttypes.py ---------------------------------------------------------------------- diff --git a/storm-client/src/py/storm/ttypes.py b/storm-client/src/py/storm/ttypes.py index 5d942b2..2ae3605 100644 --- a/storm-client/src/py/storm/ttypes.py +++ b/storm-client/src/py/storm/ttypes.py @@ -241,6 +241,20 @@ class HBServerMessageType: "NOT_AUTHORIZED": 18, } +class WorkerTokenServiceType: + NIMBUS = 0 + DRPC = 1 + + _VALUES_TO_NAMES = { + 0: "NIMBUS", + 1: "DRPC", + } + + _NAMES_TO_VALUES = { + "NIMBUS": 0, + "DRPC": 1, + } + class JavaObjectArg: """ @@ -12910,3 +12924,309 @@ class HBExecutionException(TException): def __ne__(self, other): return not (self == other) + +class WorkerTokenInfo: + """ + Attributes: + - userName + - topologyId + - secretVersion + - expirationTimeMillis + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'userName', None, None, ), # 1 + (2, TType.STRING, 'topologyId', None, None, ), # 2 + (3, TType.I64, 'secretVersion', None, None, ), # 3 + (4, TType.I64, 'expirationTimeMillis', None, None, ), # 4 + ) + + def __init__(self, userName=None, topologyId=None, secretVersion=None, expirationTimeMillis=None,): + self.userName = userName + self.topologyId = topologyId + self.secretVersion = secretVersion + self.expirationTimeMillis = expirationTimeMillis + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.userName = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.topologyId = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.secretVersion = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.I64: + self.expirationTimeMillis = iprot.readI64() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('WorkerTokenInfo') + if self.userName is not None: + oprot.writeFieldBegin('userName', TType.STRING, 1) + oprot.writeString(self.userName.encode('utf-8')) + oprot.writeFieldEnd() + if self.topologyId is not None: + oprot.writeFieldBegin('topologyId', TType.STRING, 2) + oprot.writeString(self.topologyId.encode('utf-8')) + oprot.writeFieldEnd() + if self.secretVersion is not None: + oprot.writeFieldBegin('secretVersion', TType.I64, 3) + oprot.writeI64(self.secretVersion) + oprot.writeFieldEnd() + if self.expirationTimeMillis is not None: + oprot.writeFieldBegin('expirationTimeMillis', TType.I64, 4) + oprot.writeI64(self.expirationTimeMillis) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.userName is None: + raise TProtocol.TProtocolException(message='Required field userName is unset!') + if self.topologyId is None: + raise TProtocol.TProtocolException(message='Required field topologyId is unset!') + if self.secretVersion is None: + raise TProtocol.TProtocolException(message='Required field secretVersion is unset!') + if self.expirationTimeMillis is None: + raise TProtocol.TProtocolException(message='Required field expirationTimeMillis is unset!') + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.userName) + value = (value * 31) ^ hash(self.topologyId) + value = (value * 31) ^ hash(self.secretVersion) + value = (value * 31) ^ hash(self.expirationTimeMillis) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class WorkerToken: + """ + Attributes: + - serviceType + - info + - signature + """ + + thrift_spec = ( + None, # 0 + (1, TType.I32, 'serviceType', None, None, ), # 1 + (2, TType.STRING, 'info', None, None, ), # 2 + (3, TType.STRING, 'signature', None, None, ), # 3 + ) + + def __init__(self, serviceType=None, info=None, signature=None,): + self.serviceType = serviceType + self.info = info + self.signature = signature + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I32: + self.serviceType = iprot.readI32() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.info = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRING: + self.signature = iprot.readString() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('WorkerToken') + if self.serviceType is not None: + oprot.writeFieldBegin('serviceType', TType.I32, 1) + oprot.writeI32(self.serviceType) + oprot.writeFieldEnd() + if self.info is not None: + oprot.writeFieldBegin('info', TType.STRING, 2) + oprot.writeString(self.info) + oprot.writeFieldEnd() + if self.signature is not None: + oprot.writeFieldBegin('signature', TType.STRING, 3) + oprot.writeString(self.signature) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.serviceType is None: + raise TProtocol.TProtocolException(message='Required field serviceType is unset!') + if self.info is None: + raise TProtocol.TProtocolException(message='Required field info is unset!') + if self.signature is None: + raise TProtocol.TProtocolException(message='Required field signature is unset!') + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.serviceType) + value = (value * 31) ^ hash(self.info) + value = (value * 31) ^ hash(self.signature) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class PrivateWorkerKey: + """ + Attributes: + - key + - userName + - expirationTimeMillis + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'key', None, None, ), # 1 + (2, TType.STRING, 'userName', None, None, ), # 2 + (3, TType.I64, 'expirationTimeMillis', None, None, ), # 3 + ) + + def __init__(self, key=None, userName=None, expirationTimeMillis=None,): + self.key = key + self.userName = userName + self.expirationTimeMillis = expirationTimeMillis + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.key = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.userName = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.expirationTimeMillis = iprot.readI64() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('PrivateWorkerKey') + if self.key is not None: + oprot.writeFieldBegin('key', TType.STRING, 1) + oprot.writeString(self.key) + oprot.writeFieldEnd() + if self.userName is not None: + oprot.writeFieldBegin('userName', TType.STRING, 2) + oprot.writeString(self.userName.encode('utf-8')) + oprot.writeFieldEnd() + if self.expirationTimeMillis is not None: + oprot.writeFieldBegin('expirationTimeMillis', TType.I64, 3) + oprot.writeI64(self.expirationTimeMillis) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.key is None: + raise TProtocol.TProtocolException(message='Required field key is unset!') + if self.userName is None: + raise TProtocol.TProtocolException(message='Required field userName is unset!') + if self.expirationTimeMillis is None: + raise TProtocol.TProtocolException(message='Required field expirationTimeMillis is unset!') + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.key) + value = (value * 31) ^ hash(self.userName) + value = (value * 31) ^ hash(self.expirationTimeMillis) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other)
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/storm.thrift ---------------------------------------------------------------------- diff --git a/storm-client/src/storm.thrift b/storm-client/src/storm.thrift index c5140f3..81b71f3 100644 --- a/storm-client/src/storm.thrift +++ b/storm-client/src/storm.thrift @@ -858,4 +858,43 @@ exception HBExecutionException { 1: required string msg; } - +# WorkerTokens are used as credentials that allow a Worker to authenticate with DRPC, Nimbus, or other storm processes that we add in here. +enum WorkerTokenServiceType { + NIMBUS, + DRPC +} + +#This is information that we want to be sure users do not modify in any way... +struct WorkerTokenInfo { + # The user/owner of the topology. So we can authorize based off of a user + 1: required string userName; + # The topology id that this token is a part of. So we can find the right sceret key, and so we can + # authorize based off of a topology if needed. + 2: required string topologyId; + # What version of the secret key to use. If it is too old or we cannot find it, then the token will not be valid. + 3: required i64 secretVersion; + # Unix time stamp in millis when this expires + 4: required i64 expirationTimeMillis; +} + +#This is what we give to worker so they can authenticate with built in daemons +struct WorkerToken { + # What service is this for? + 1: required WorkerTokenServiceType serviceType; + # A serialized version of a WorkerTokenInfo. We double encode it so the bits don't change between a serialzie/deserialize cycle. + 2: required binary info; + # how to prove that info is correct and unmodified when it gets back to us. + 3: required binary signature; +} + +#This is the private information that we can use to verify a WorkerToken is still valid +# The topology id and version number are stored outside of this as the key to look it up. +struct PrivateWorkerKey { + #This is the key itself. An algorithm selection may be added in the future, but for now there is only + # one so don't worry about it. + 1: required binary key; + # Extra sanity check that the user is correct. + 2: required string userName; + # Unix time stamp in millis when this, and any corresponding tokens, expire + 3: required i64 expirationTimeMillis; +} http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/test/jvm/org/apache/storm/cluster/DaemonTypeTest.java ---------------------------------------------------------------------- diff --git a/storm-client/test/jvm/org/apache/storm/cluster/DaemonTypeTest.java b/storm-client/test/jvm/org/apache/storm/cluster/DaemonTypeTest.java new file mode 100644 index 0000000..dda7def --- /dev/null +++ b/storm-client/test/jvm/org/apache/storm/cluster/DaemonTypeTest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.cluster; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.storm.Config; +import org.apache.storm.security.auth.DefaultPrincipalToLocal; +import org.apache.storm.utils.ConfigUtils; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class DaemonTypeTest { + + @Test + public void getDefaultZkAclsDefaultConf() { + Map<String, Object> conf = ConfigUtils.readStormConfig(); + assertNull(DaemonType.UNKNOWN.getDefaultZkAcls(conf)); + assertNull(DaemonType.PACEMAKER.getDefaultZkAcls(conf)); + assertNull(DaemonType.SUPERVISOR.getDefaultZkAcls(conf)); + assertNull(DaemonType.NIMBUS.getDefaultZkAcls(conf)); + assertNull(DaemonType.WORKER.getDefaultZkAcls(conf)); + } + + @Test + public void getDefaultZkAclsSecureServerConf() { + Map<String, Object> conf = ConfigUtils.readStormConfig(); + conf.put(Config.STORM_ZOOKEEPER_AUTH_SCHEME, "digest"); + conf.put(Config.STORM_ZOOKEEPER_AUTH_PAYLOAD, "storm:thisisapoorpassword"); + conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, DefaultPrincipalToLocal.class.getName()); + conf.put(Config.NIMBUS_THRIFT_PORT, 6666); + + assertNull(DaemonType.UNKNOWN.getDefaultZkAcls(conf)); + assertNull(DaemonType.PACEMAKER.getDefaultZkAcls(conf)); + assertEquals(DaemonType.NIMBUS_SUPERVISOR_ZK_ACLS, DaemonType.SUPERVISOR.getDefaultZkAcls(conf)); + assertEquals(DaemonType.NIMBUS_SUPERVISOR_ZK_ACLS, DaemonType.NIMBUS.getDefaultZkAcls(conf)); + assertNull(DaemonType.WORKER.getDefaultZkAcls(conf)); + } + + @Test + public void getDefaultZkAclsSecureWorkerConf() { + Map<String, Object> conf = ConfigUtils.readStormConfig(); + conf.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME, "digest"); + conf.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD, "storm:thisisapoorpassword"); + conf.put(Config.STORM_ZOOKEEPER_SUPERACL, "sasl:nimbus"); + conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, DefaultPrincipalToLocal.class.getName()); + conf.put(Config.NIMBUS_THRIFT_PORT, 6666); + + assertNull(DaemonType.UNKNOWN.getDefaultZkAcls(conf)); + assertNull(DaemonType.PACEMAKER.getDefaultZkAcls(conf)); + assertNull(DaemonType.SUPERVISOR.getDefaultZkAcls(conf)); + assertNull(DaemonType.NIMBUS.getDefaultZkAcls(conf)); + List<ACL> expected = new ArrayList<>(ZooDefs.Ids.CREATOR_ALL_ACL); + expected.add(new ACL(ZooDefs.Perms.ALL, new Id("sasl", "nimbus"))); + assertEquals(expected, DaemonType.WORKER.getDefaultZkAcls(conf)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java ---------------------------------------------------------------------- diff --git a/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java b/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java index 1bd08b8..bd20d7b 100644 --- a/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java +++ b/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java @@ -15,27 +15,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.cluster; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.HashMap; +import org.apache.storm.callback.ZKStateChangedCallback; +import org.apache.zookeeper.KeeperException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; - -import org.mockito.Mockito; import org.mockito.Matchers; - +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.zookeeper.KeeperException; - -import org.apache.storm.callback.ZKStateChangedCallback; -import org.apache.storm.cluster.ClusterStateContext; - public class StormClusterStateImplTest { private static final Logger LOG = LoggerFactory.getLogger(StormClusterStateImplTest.class); @@ -57,7 +49,7 @@ public class StormClusterStateImplTest { public void init() throws Exception { storage = Mockito.mock(IStateStorage.class); context = new ClusterStateContext(); - state = new StormClusterStateImpl(storage, null /*acls*/, context, false /*solo*/); + state = new StormClusterStateImpl(storage, context, false /*solo*/); } http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/test/jvm/org/apache/storm/security/auth/SaslTransportPluginTest.java ---------------------------------------------------------------------- diff --git a/storm-client/test/jvm/org/apache/storm/security/auth/SaslTransportPluginTest.java b/storm-client/test/jvm/org/apache/storm/security/auth/SaslTransportPluginTest.java index 005d415..245261b 100644 --- a/storm-client/test/jvm/org/apache/storm/security/auth/SaslTransportPluginTest.java +++ b/storm-client/test/jvm/org/apache/storm/security/auth/SaslTransportPluginTest.java @@ -17,6 +17,7 @@ */ package org.apache.storm.security.auth; +import org.apache.storm.security.auth.sasl.SaslTransportPlugin; import org.junit.Test; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java index 7f72a48..8d66a61 100644 --- a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java +++ b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java @@ -15,15 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.command; -import java.util.ArrayList; +import com.google.common.collect.Sets; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; - import org.apache.storm.blobstore.BlobStore; import org.apache.storm.blobstore.KeyFilter; import org.apache.storm.blobstore.LocalFsBlobStore; @@ -35,13 +35,9 @@ import org.apache.storm.nimbus.NimbusInfo; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.Utils; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Sets; - public class AdminCommands { private static final Logger LOG = LoggerFactory.getLogger(AdminCommands.class); @@ -49,7 +45,7 @@ public class AdminCommands { private static IStormClusterState stormClusterState; private static Map<String, Object> conf; - public static void main(String [] args) throws Exception { + public static void main(String [] args) { if (args.length == 0) { throw new IllegalArgumentException("Missing command. Supported command is remove_corrupt_topologies"); @@ -71,26 +67,14 @@ public class AdminCommands { private static void initialize() { conf = Utils.readStormConfig(); nimbusBlobStore = ServerUtils.getNimbusBlobStore (conf, NimbusInfo.fromConf(conf)); - List<ACL> acls = null; - if (Utils.isZkAuthenticationConfiguredStormServer(conf)) { - acls = adminZkAcls(); - } try { - stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.NIMBUS)); + stormClusterState = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf)); } catch (Exception e) { LOG.error("admin can't create stormClusterState"); new RuntimeException(e); } } - // we might think of moving this method in Utils class - private static List<ACL> adminZkAcls() { - final List<ACL> acls = new ArrayList<>(); - acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0)); - acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE)); - return acls; - } - private static Set<String> getKeyListFromId( String corruptId) { Set<String> keyLists = new HashSet<>(); keyLists.add(ConfigUtils.masterStormCodeKey(corruptId)); http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java b/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java index 668f019..04dc954 100644 --- a/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java +++ b/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java @@ -43,7 +43,7 @@ public class Heartbeats { String path = args[1]; Map<String, Object> conf = Utils.readStormConfig(); - IStateStorage cluster = ClusterUtils.mkStateStorage(conf, conf, null, new ClusterStateContext()); + IStateStorage cluster = ClusterUtils.mkStateStorage(conf, conf, new ClusterStateContext()); LOG.info("Command: [{}]", command); http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/cluster_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj index c6c87c1..1b2f3a2 100644 --- a/storm-core/test/clj/org/apache/storm/cluster_test.clj +++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj @@ -40,13 +40,13 @@ (defn mk-state ([zk-port] (let [conf (mk-config zk-port)] - (ClusterUtils/mkStateStorage conf conf nil (ClusterStateContext.)))) + (ClusterUtils/mkStateStorage conf conf (ClusterStateContext.)))) ([zk-port cb] (let [ret (mk-state zk-port)] (.register ret cb) ret))) -(defn mk-storm-state [zk-port] (ClusterUtils/mkStormClusterState (mk-config zk-port) nil (ClusterStateContext.))) +(defn mk-storm-state [zk-port] (ClusterUtils/mkStormClusterState (mk-config zk-port) (ClusterStateContext.))) (defn barr [& vals] @@ -354,12 +354,12 @@ ;; No need for when clauses because we just want to return nil (with-open [_ (MockedClientZookeeper. zk-mock)] (. (Mockito/when (.mkClientImpl zk-mock (Mockito/anyMap) (Mockito/any) (Mockito/any) (Mockito/anyString) (Mockito/any) (Mockito/any))) (thenReturn curator-frameworke)) - (ClusterUtils/mkStateStorage {} nil nil (ClusterStateContext.)) + (ClusterUtils/mkStateStorage {} nil (ClusterStateContext.)) (.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) (Mockito/any) (Mockito/anyString) (Mockito/eq nil)))) (let [distributed-state-storage (reify IStateStorage (register [this callback] nil) (mkdirs [this path acls] nil)) cluster-utils (Mockito/mock ClusterUtils)] (with-open [mocked-cluster (MockedCluster. cluster-utils)] - (. (Mockito/when (.mkStateStorageImpl cluster-utils (Mockito/any) (Mockito/any) (Mockito/eq nil) (Mockito/any))) (thenReturn distributed-state-storage)) - (ClusterUtils/mkStormClusterState {} nil (ClusterStateContext.)))))) + (. (Mockito/when (.mkStateStorageImpl cluster-utils (Mockito/any) (Mockito/any) (Mockito/any))) (thenReturn distributed-state-storage)) + (ClusterUtils/mkStormClusterState {} (ClusterStateContext.)))))) http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/nimbus_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj index 03b7388..4a3f2a8 100644 --- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj +++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj @@ -1319,7 +1319,7 @@ STORM-CLUSTER-MODE "local" STORM-ZOOKEEPER-PORT (.getPort zk) STORM-LOCAL-DIR nimbus-dir})) - (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.))) + (bind cluster-state (ClusterUtils/mkStormClusterState conf (ClusterStateContext.))) (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil)) (.launchServer nimbus) (bind topology (Thrift/buildTopology @@ -1331,7 +1331,7 @@ (zkLeaderElectorImpl [conf zk blob-store tc] (MockLeaderElector. false))))] (letlocals - (bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.))) + (bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf (ClusterStateContext.))) (bind non-leader-nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil)) (.launchServer non-leader-nimbus) @@ -1611,36 +1611,6 @@ ) )) -(deftest test-nimbus-data-acls - (testing "nimbus-data uses correct ACLs" - (let [scheme "digest" - digest "storm:thisisapoorpassword" - auth-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {STORM-ZOOKEEPER-AUTH-SCHEME scheme - STORM-ZOOKEEPER-AUTH-PAYLOAD digest - STORM-PRINCIPAL-TO-LOCAL-PLUGIN "org.apache.storm.security.auth.DefaultPrincipalToLocal" - NIMBUS-MONITOR-FREQ-SECS 10 - NIMBUS-THRIFT-PORT 6666}) - expected-acls Nimbus/ZK_ACLS - fake-inimbus (reify INimbus (getForcedScheduler [this] nil) (prepare [this conf dir] nil)) - fake-cu (proxy [ServerConfigUtils] [] - (nimbusTopoHistoryStateImpl [conf] nil)) - fake-utils (proxy [Utils] [] - (makeUptimeComputer [] (proxy [Utils$UptimeComputer] [] - (upTime [] 0)))) - cluster-utils (Mockito/mock ClusterUtils) - fake-common (proxy [StormCommon] [] - (mkAuthorizationHandler [_] nil))] - (with-open [_ (ServerConfigUtilsInstaller. fake-cu) - _ (UtilsInstaller. fake-utils) - - (StormCommonInstaller. fake-common) - zk-le (MockedZookeeper. (proxy [Zookeeper] [] - (zkLeaderElectorImpl [conf zk blob-store tc] nil))) - mocked-cluster (MockedCluster. cluster-utils)] - (mk-nimbus auth-conf fake-inimbus) - (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any)) - )))) - (deftest test-file-bogus-download (with-open [cluster (.build (doto (LocalCluster$Builder. ) @@ -1679,7 +1649,7 @@ STORM-CLUSTER-MODE "local" STORM-ZOOKEEPER-PORT (.getPort zk) STORM-LOCAL-DIR nimbus-dir})) - (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.))) + (bind cluster-state (ClusterUtils/mkStormClusterState conf (ClusterStateContext.))) (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil)) (.launchServer nimbus) (Time/sleepSecs 1) @@ -1715,7 +1685,7 @@ STORM-ZOOKEEPER-PORT (.getPort zk) STORM-LOCAL-DIR nimbus-dir NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN (.getName InMemoryTopologyActionNotifier)})) - (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.))) + (bind cluster-state (ClusterUtils/mkStormClusterState conf (ClusterStateContext.))) (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil)) (.launchServer nimbus) (bind notifier (InMemoryTopologyActionNotifier.)) @@ -1848,21 +1818,26 @@ (defn teardown-heartbeats [id]) (defn teardown-topo-errors [id]) (defn teardown-backpressure-dirs [id]) +(defn teardown-wt-dirs [id]) (defn mock-cluster-state ([] (mock-cluster-state nil nil)) ([active-topos inactive-topos] - (mock-cluster-state active-topos inactive-topos inactive-topos inactive-topos)) + (mock-cluster-state active-topos inactive-topos inactive-topos inactive-topos inactive-topos)) ([active-topos hb-topos error-topos bp-topos] + (mock-cluster-state active-topos hb-topos error-topos bp-topos nil)) + ([active-topos hb-topos error-topos bp-topos wt-topos] (reify IStormClusterState (teardownHeartbeats [this id] (teardown-heartbeats id)) (teardownTopologyErrors [this id] (teardown-topo-errors id)) (removeBackpressure [this id] (teardown-backpressure-dirs id)) + (removeAllPrivateWorkerKeys [this id] (teardown-wt-dirs id)) (activeStorms [this] active-topos) (heartbeatStorms [this] hb-topos) (errorTopologies [this] error-topos) - (backpressureTopologies [this] bp-topos)))) + (backpressureTopologies [this] bp-topos) + (idsOfTopologiesWithPrivateWorkerKeys [this] (into #{} wt-topos))))) (deftest cleanup-storm-ids-returns-inactive-topos (let [mock-state (mock-cluster-state (list "topo1") (list "topo1" "topo2" "topo3")) http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj deleted file mode 100644 index a77849a..0000000 --- a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj +++ /dev/null @@ -1,474 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.security.auth.auth-test - (:use [clojure test]) - (:import [org.apache.thrift TException] - [org.json.simple JSONValue] - [org.apache.storm.security.auth.authorizer ImpersonationAuthorizer] - [java.net Inet4Address]) - (:import [org.apache.storm.blobstore BlobStore]) - (:import [org.apache.thrift.transport TTransportException]) - (:import [org.apache.storm.testing.staticmocking MockedZookeeper]) - (:import [org.apache.storm.nimbus ILeaderElector]) - (:import [org.apache.storm.cluster IStormClusterState]) - (:import [org.mockito Mockito]) - (:import [org.apache.storm.zookeeper Zookeeper]) - (:import [java.nio ByteBuffer]) - (:import [java.security Principal AccessController]) - (:import [javax.security.auth Subject]) - (:import [java.net InetAddress]) - (:import [org.apache.storm Config Testing Testing$Condition DaemonConfig]) - (:import [org.apache.storm.generated AuthorizationException]) - (:import [org.apache.storm.daemon.nimbus Nimbus$StandaloneINimbus]) - (:import [org.apache.storm.utils NimbusClient Time]) - (:import [org.apache.storm.security.auth FixedGroupsMapping FixedGroupsMapping]) - (:import [org.apache.storm.security.auth.authorizer SimpleWhitelistAuthorizer SimpleACLAuthorizer]) - (:import [org.apache.storm.security.auth AuthUtils ThriftServer ThriftClient ShellBasedGroupsMapping - ReqContext SimpleTransportPlugin KerberosPrincipalToLocal ThriftConnectionType]) - (:import [org.apache.storm.daemon StormCommon]) - (:use [org.apache.storm util daemon-config config]) - (:import [org.apache.storm.generated Nimbus Nimbus$Client Nimbus$Iface StormTopology SubmitOptions - KillOptions RebalanceOptions ClusterSummary TopologyInfo Nimbus$Processor] - (org.json.simple JSONValue)) - (:import [org.apache.storm.utils ConfigUtils Utils])) - -(defn mk-principal [name] - (reify Principal - (equals [this other] - (= name (.getName other))) - (getName [this] name) - (toString [this] name) - (hashCode [this] (.hashCode name)))) - -(defn mk-subject [name] - (Subject. true #{(mk-principal name)} #{} #{})) - -;; 3 seconds in milliseconds -;; This is plenty of time for a thrift client to respond. -(def nimbus-timeout (Integer. (* 3 1000))) - -(defn nimbus-data [storm-conf inimbus] - (with-open [_ (MockedZookeeper. (proxy [Zookeeper] [] - (zkLeaderElectorImpl [conf zk blob-store tc] (Mockito/mock ILeaderElector))))] - (org.apache.storm.daemon.nimbus.Nimbus. storm-conf inimbus (Mockito/mock IStormClusterState) nil (Mockito/mock BlobStore) nil nil))) - -(defn dummy-service-handler - ([conf inimbus auth-context] - (let [nimbus-d (nimbus-data conf inimbus) - topo-conf (atom nil)] - (reify Nimbus$Iface - (^void submitTopologyWithOpts [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology - ^SubmitOptions submitOptions] - (if (not (nil? serializedConf)) (swap! topo-conf (fn [prev new] new) (if serializedConf (clojurify-structure (JSONValue/parse serializedConf))))) - (.checkAuthorization nimbus-d storm-name @topo-conf "submitTopology" auth-context)) - - (^void killTopology [this ^String storm-name] - (.checkAuthorization nimbus-d storm-name @topo-conf "killTopology" auth-context)) - - (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options] - (.checkAuthorization nimbus-d storm-name @topo-conf "killTopology" auth-context)) - - (^void rebalance [this ^String storm-name ^RebalanceOptions options] - (.checkAuthorization nimbus-d storm-name @topo-conf "rebalance" auth-context)) - - (activate [this storm-name] - (.checkAuthorization nimbus-d storm-name @topo-conf "activate" auth-context)) - - (deactivate [this storm-name] - (.checkAuthorization nimbus-d storm-name @topo-conf "deactivate" auth-context)) - - (uploadNewCredentials [this storm-name creds] - (.checkAuthorization nimbus-d storm-name @topo-conf "uploadNewCredentials" auth-context)) - - (beginFileUpload [this]) - - (^void uploadChunk [this ^String location ^ByteBuffer chunk]) - - (^void finishFileUpload [this ^String location]) - - (^String beginFileDownload [this ^String file] - (.checkAuthorization nimbus-d nil nil "fileDownload" auth-context) - "Done!") - - (^ByteBuffer downloadChunk [this ^String id]) - - (^String getNimbusConf [this]) - - (^String getTopologyConf [this ^String id]) - - (^StormTopology getTopology [this ^String id]) - - (^StormTopology getUserTopology [this ^String id]) - - (^ClusterSummary getClusterInfo [this]) - - (^TopologyInfo getTopologyInfo [this ^String storm-id])))) - ([conf inimbus] - (dummy-service-handler conf inimbus nil))) - - -(defn launch-server [login-cfg aznClass transportPluginClass serverConf] - (let [conf1 (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {NIMBUS-AUTHORIZER aznClass - NIMBUS-THRIFT-PORT 0 - STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass}) - conf2 (if login-cfg (merge conf1 {"java.security.auth.login.config" login-cfg}) conf1) - conf (if serverConf (merge conf2 serverConf) conf2) - nimbus (Nimbus$StandaloneINimbus.) - service-handler (dummy-service-handler conf nimbus) - server (ThriftServer. - conf - (Nimbus$Processor. service-handler) - ThriftConnectionType/NIMBUS)] - (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop server)))) - (.start (Thread. #(.serve server))) - (Testing/whileTimeout (reify Testing$Condition (exec [this] (not (.isServing server)))) (fn [] (Time/sleep 100))) - server )) - -(defmacro with-server [[server-sym & args] & body] - `(let [~server-sym (launch-server ~@args)] - ~@body - (.stop ~server-sym) - )) - -(deftest kerb-to-local-test - (let [kptol (KerberosPrincipalToLocal. )] - (.prepare kptol {}) - (is (= "me" (.toLocal kptol (mk-principal "me@realm")))) - (is (= "simple" (.toLocal kptol (mk-principal "simple")))) - (is (= "someone" (.toLocal kptol (mk-principal "someone/host@realm")))))) - -(deftest Simple-authentication-test - (with-server [server nil nil "org.apache.storm.security.auth.SimpleTransportPlugin" nil] - (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"}) - client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout) - nimbus_client (.getClient client)] - (.activate nimbus_client "security_auth_test_topology") - (.close client)) - - (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin" - "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf" - STORM-NIMBUS-RETRY-TIMES 0})] - (testing "(Negative authentication) Server: Simple vs. Client: Digest" - (is (thrown-cause? org.apache.thrift.transport.TTransportException - (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout))))))) - -(deftest negative-whitelist-authorization-test - (with-server [server nil - "org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer" - "org.apache.storm.testing.SingleUserSimpleTransport" nil] - (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.testing.SingleUserSimpleTransport"}) - client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout) - nimbus_client (.getClient client)] - (testing "(Negative authorization) Authorization plugin should reject client request" - (is (thrown-cause? AuthorizationException - (.activate nimbus_client "security_auth_test_topology")))) - (.close client)))) - -(deftest positive-whitelist-authorization-test - (with-server [server nil - "org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer" - "org.apache.storm.testing.SingleUserSimpleTransport" {SimpleWhitelistAuthorizer/WHITELIST_USERS_CONF ["user"]}] - (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.testing.SingleUserSimpleTransport"}) - client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout) - nimbus_client (.getClient client)] - (testing "(Positive authorization) Authorization plugin should accept client request" - (.activate nimbus_client "security_auth_test_topology")) - (.close client)))) - -(deftest simple-acl-user-auth-test - (let [cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {NIMBUS-ADMINS ["admin"] - NIMBUS-SUPERVISOR-USERS ["supervisor"]}) - authorizer (SimpleACLAuthorizer. ) - admin-user (mk-subject "admin") - supervisor-user (mk-subject "supervisor") - user-a (mk-subject "user-a") - user-b (mk-subject "user-b")] - (.prepare authorizer cluster-conf) - (is (= true (.permit authorizer (ReqContext. user-a) "submitTopology" {}))) - (is (= true (.permit authorizer (ReqContext. user-b) "submitTopology" {}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "submitTopology" {}))) - (is (= false (.permit authorizer (ReqContext. supervisor-user) "submitTopology" {}))) - - (is (= true (.permit authorizer (ReqContext. user-a) "fileUpload" nil))) - (is (= true (.permit authorizer (ReqContext. user-b) "fileUpload" nil))) - (is (= true (.permit authorizer (ReqContext. admin-user) "fileUpload" nil))) - (is (= false (.permit authorizer (ReqContext. supervisor-user) "fileUpload" nil))) - - (is (= true (.permit authorizer (ReqContext. user-a) "getNimbusConf" nil))) - (is (= true (.permit authorizer (ReqContext. user-b) "getNimbusConf" nil))) - (is (= true (.permit authorizer (ReqContext. admin-user) "getNimbusConf" nil))) - (is (= false (.permit authorizer (ReqContext. supervisor-user) "getNimbusConf" nil))) - - (is (= true (.permit authorizer (ReqContext. user-a) "getClusterInfo" nil))) - (is (= true (.permit authorizer (ReqContext. user-b) "getClusterInfo" nil))) - (is (= true (.permit authorizer (ReqContext. admin-user) "getClusterInfo" nil))) - (is (= false (.permit authorizer (ReqContext. supervisor-user) "getClusterInfo" nil))) - - (is (= false (.permit authorizer (ReqContext. user-a) "fileDownload" nil))) - (is (= false (.permit authorizer (ReqContext. user-b) "fileDownload" nil))) - (is (= true (.permit authorizer (ReqContext. admin-user) "fileDownload" nil))) - (is (= true (.permit authorizer (ReqContext. supervisor-user) "fileDownload" nil))) - - (is (= true (.permit authorizer (ReqContext. user-a) "killTopology" {TOPOLOGY-USERS ["user-a"]}))) - (is (= false (.permit authorizer (ReqContext. user-b) "killTopology" {TOPOLOGY-USERS ["user-a"]}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "killTopology" {TOPOLOGY-USERS ["user-a"]}))) - (is (= false (.permit authorizer (ReqContext. supervisor-user) "killTopolgy" {TOPOLOGY-USERS ["user-a"]}))) - - (is (= true (.permit authorizer (ReqContext. user-a) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]}))) - (is (= false (.permit authorizer (ReqContext. user-b) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]}))) - (is (= false (.permit authorizer (ReqContext. supervisor-user) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]}))) - - (is (= true (.permit authorizer (ReqContext. user-a) "rebalance" {TOPOLOGY-USERS ["user-a"]}))) - (is (= false (.permit authorizer (ReqContext. user-b) "rebalance" {TOPOLOGY-USERS ["user-a"]}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "rebalance" {TOPOLOGY-USERS ["user-a"]}))) - (is (= false (.permit authorizer (ReqContext. supervisor-user) "rebalance" {TOPOLOGY-USERS ["user-a"]}))) - - (is (= true (.permit authorizer (ReqContext. user-a) "activate" {TOPOLOGY-USERS ["user-a"]}))) - (is (= false (.permit authorizer (ReqContext. user-b) "activate" {TOPOLOGY-USERS ["user-a"]}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "activate" {TOPOLOGY-USERS ["user-a"]}))) - (is (= false (.permit authorizer (ReqContext. supervisor-user) "activate" {TOPOLOGY-USERS ["user-a"]}))) - - (is (= true (.permit authorizer (ReqContext. user-a) "deactivate" {TOPOLOGY-USERS ["user-a"]}))) - (is (= false (.permit authorizer (ReqContext. user-b) "deactivate" {TOPOLOGY-USERS ["user-a"]}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "deactivate" {TOPOLOGY-USERS ["user-a"]}))) - (is (= false (.permit authorizer (ReqContext. supervisor-user) "deactivate" {TOPOLOGY-USERS ["user-a"]}))) - - (is (= true (.permit authorizer (ReqContext. user-a) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]}))) - (is (= false (.permit authorizer (ReqContext. user-b) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]}))) - (is (= false (.permit authorizer (ReqContext. supervisor-user) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]}))) - - (is (= true (.permit authorizer (ReqContext. user-a) "getTopology" {TOPOLOGY-USERS ["user-a"]}))) - (is (= false (.permit authorizer (ReqContext. user-b) "getTopology" {TOPOLOGY-USERS ["user-a"]}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "getTopology" {TOPOLOGY-USERS ["user-a"]}))) - (is (= false (.permit authorizer (ReqContext. supervisor-user) "getTopology" {TOPOLOGY-USERS ["user-a"]}))) - - (is (= true (.permit authorizer (ReqContext. user-a) "getUserTopology" {TOPOLOGY-USERS ["user-a"]}))) - (is (= false (.permit authorizer (ReqContext. user-b) "getUserTopology" {TOPOLOGY-USERS ["user-a"]}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "getUserTopology" {TOPOLOGY-USERS ["user-a"]}))) - (is (= false (.permit authorizer (ReqContext. supervisor-user) "getUserTopology" {TOPOLOGY-USERS ["user-a"]}))) - - (is (= true (.permit authorizer (ReqContext. user-a) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]}))) - (is (= false (.permit authorizer (ReqContext. user-b) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]}))) - (is (= false (.permit authorizer (ReqContext. supervisor-user) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]}))) -)) - -(deftest simple-acl-nimbus-users-auth-test - (let [cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {NIMBUS-ADMINS ["admin"] - NIMBUS-SUPERVISOR-USERS ["supervisor"] - NIMBUS-USERS ["user-a"]}) - authorizer (SimpleACLAuthorizer. ) - admin-user (mk-subject "admin") - supervisor-user (mk-subject "supervisor") - user-a (mk-subject "user-a") - user-b (mk-subject "user-b")] - (.prepare authorizer cluster-conf) - (is (= true (.permit authorizer (ReqContext. user-a) "submitTopology" {}))) - (is (= false (.permit authorizer (ReqContext. user-b) "submitTopology" {}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "fileUpload" nil))) - (is (= true (.permit authorizer (ReqContext. supervisor-user) "fileDownload" nil))))) - -(deftest simple-acl-nimbus-groups-auth-test - (let [cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {NIMBUS-ADMINS-GROUPS ["admin-group"] - NIMBUS-USERS ["user-a"] - NIMBUS-SUPERVISOR-USERS ["supervisor"] - STORM-GROUP-MAPPING-SERVICE-PROVIDER-PLUGIN "org.apache.storm.security.auth.FixedGroupsMapping" - STORM-GROUP-MAPPING-SERVICE-PARAMS {FixedGroupsMapping/STORM_FIXED_GROUP_MAPPING - {"admin" #{"admin-group"} - "not-admin" #{"not-admin-group"}}}}) - authorizer (SimpleACLAuthorizer. ) - admin-user (mk-subject "admin") - not-admin-user (mk-subject "not-admin") - supervisor-user (mk-subject "supervisor") - user-a (mk-subject "user-a") - user-b (mk-subject "user-b")] - (.prepare authorizer cluster-conf) - (is (= true (.permit authorizer (ReqContext. user-a) "submitTopology" {}))) - (is (= false (.permit authorizer (ReqContext. user-b) "submitTopology" {}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "fileUpload" nil))) - (is (= false (.permit authorizer (ReqContext. not-admin-user) "fileUpload" nil))) - (is (= false (.permit authorizer (ReqContext. user-b) "fileUpload" nil))) - (is (= true (.permit authorizer (ReqContext. supervisor-user) "fileDownload" nil))))) - -(deftest shell-based-groups-mapping-test - (let [cluster-conf (clojurify-structure (ConfigUtils/readStormConfig)) - groups (ShellBasedGroupsMapping. ) - user-name (System/getProperty "user.name")] - (.prepare groups cluster-conf) - (is (<= 0 (.size (.getGroups groups user-name)))) - (is (= 0 (.size (.getGroups groups "userDoesNotExist")))) - (is (= 0 (.size (.getGroups groups nil)))))) - -(deftest simple-acl-same-user-auth-test - (let [cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {NIMBUS-ADMINS ["admin"] - NIMBUS-SUPERVISOR-USERS ["admin"]}) - authorizer (SimpleACLAuthorizer. ) - admin-user (mk-subject "admin")] - (.prepare authorizer cluster-conf) - (is (= true (.permit authorizer (ReqContext. admin-user) "submitTopology" {}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "fileUpload" nil))) - (is (= true (.permit authorizer (ReqContext. admin-user) "getNimbusConf" nil))) - (is (= true (.permit authorizer (ReqContext. admin-user) "getClusterInfo" nil))) - (is (= true (.permit authorizer (ReqContext. admin-user) "fileDownload" nil))) - (is (= true (.permit authorizer (ReqContext. admin-user) "killTopology" {TOPOLOGY-USERS ["user-a"]}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "rebalance" {TOPOLOGY-USERS ["user-a"]}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "activate" {TOPOLOGY-USERS ["user-a"]}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "deactivate" {TOPOLOGY-USERS ["user-a"]}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "getTopology" {TOPOLOGY-USERS ["user-a"]}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "getUserTopology" {TOPOLOGY-USERS ["user-a"]}))) - (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]}))) -)) - - -(deftest positive-authorization-test - (with-server [server nil - "org.apache.storm.security.auth.authorizer.NoopAuthorizer" - "org.apache.storm.security.auth.SimpleTransportPlugin" nil] - (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"}) - client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout) - nimbus_client (.getClient client)] - (testing "(Positive authorization) Authorization plugin should accept client request" - (.activate nimbus_client "security_auth_test_topology")) - (.close client)))) - -(deftest deny-authorization-test - (with-server [server nil - "org.apache.storm.security.auth.authorizer.DenyAuthorizer" - "org.apache.storm.security.auth.SimpleTransportPlugin" nil] - (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin" - Config/NIMBUS_THRIFT_PORT (.getPort server) - DaemonConfig/NIMBUS_TASK_TIMEOUT_SECS nimbus-timeout}) - client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout) - nimbus_client (.getClient client)] - (testing "(Negative authorization) Authorization plugin should reject client request" - (is (thrown-cause? AuthorizationException - (.activate nimbus_client "security_auth_test_topology")))) - (.close client)))) - -(deftest digest-authentication-test - (with-server [server - "test/clj/org/apache/storm/security/auth/jaas_digest.conf" - nil - "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin" nil] - (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin" - "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf" - STORM-NIMBUS-RETRY-TIMES 0}) - client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout) - nimbus_client (.getClient client)] - (testing "(Positive authentication) valid digest authentication" - (.activate nimbus_client "security_auth_test_topology")) - (.close client)) - - (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin" - STORM-NIMBUS-RETRY-TIMES 0}) - client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout) - nimbus_client (.getClient client)] - (testing "(Negative authentication) Server: Digest vs. Client: Simple" - (is (thrown-cause? org.apache.thrift.transport.TTransportException - (.activate nimbus_client "security_auth_test_topology")))) - (.close client)) - - (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin" - "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_bad_password.conf" - STORM-NIMBUS-RETRY-TIMES 0})] - (testing "(Negative authentication) Invalid password" - (is (thrown-cause? TTransportException - (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout))))) - - (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin" - "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_unknown_user.conf" - STORM-NIMBUS-RETRY-TIMES 0})] - (testing "(Negative authentication) Unknown user" - (is (thrown-cause? TTransportException - (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout))))) - - (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin" - "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/nonexistent.conf" - STORM-NIMBUS-RETRY-TIMES 0})] - (testing "(Negative authentication) nonexistent configuration file" - (is (thrown-cause? RuntimeException - (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout))))) - - (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin" - "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_missing_client.conf" - STORM-NIMBUS-RETRY-TIMES 0})] - (testing "(Negative authentication) Missing client" - (is (thrown-cause? java.io.IOException - (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout))))))) - -(deftest test-GetTransportPlugin-throws-RuntimeException - (let [conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {Config/STORM_THRIFT_TRANSPORT_PLUGIN "null.invalid"})] - (is (thrown-cause? RuntimeException (AuthUtils/GetTransportPlugin conf nil nil))))) - -(defn mk-impersonating-req-context [impersonating-user user-being-impersonated remote-address] - (let [impersonating-principal (mk-principal impersonating-user) - principal-being-impersonated (mk-principal user-being-impersonated) - subject (Subject. true #{principal-being-impersonated} #{} #{}) - req_context (ReqContext. subject)] - (.setRemoteAddress req_context remote-address) - (.setRealPrincipal req_context impersonating-principal) - req_context)) - -(deftest impersonation-authorizer-test - (let [impersonating-user "admin" - user-being-impersonated (System/getProperty "user.name") - groups (ShellBasedGroupsMapping.) - _ (.prepare groups (clojurify-structure (ConfigUtils/readStormConfig))) - groups (.getGroups groups user-being-impersonated) - cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {Config/NIMBUS_IMPERSONATION_ACL {impersonating-user {"hosts" [ (.getHostName (InetAddress/getLocalHost))] - "groups" groups}}}) - authorizer (ImpersonationAuthorizer. ) - unauthorized-host (com.google.common.net.InetAddresses/forString "10.10.10.10") - ] - - (.prepare authorizer cluster-conf) - ;;non impersonating request, should be permitted. - (is (= true (.permit authorizer (ReqContext. (mk-subject "anyuser")) "fileUpload" nil))) - - ;;user with no impersonation acl should be reject - (is (= false (.permit authorizer (mk-impersonating-req-context "user-with-no-acl" user-being-impersonated (InetAddress/getLocalHost)) "someOperation" nil))) - - ;;request from hosts that are not authorized should be rejected, commented because - (is (= false (.permit authorizer (mk-impersonating-req-context impersonating-user user-being-impersonated unauthorized-host) "someOperation" nil))) - - ;;request to impersonate users from unauthroized groups should be rejected. - (is (= false (.permit authorizer (mk-impersonating-req-context impersonating-user "unauthroized-user" (InetAddress/getLocalHost)) "someOperation" nil))) - - ;;request from authorized hosts and group should be allowed. - (is (= true (.permit authorizer (mk-impersonating-req-context impersonating-user user-being-impersonated (InetAddress/getLocalHost)) "someOperation" nil))))) http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_bad_password.conf ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_bad_password.conf b/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_bad_password.conf deleted file mode 100644 index 149db3f..0000000 --- a/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_bad_password.conf +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* This sample file containes incorrect password of a user. - We use this file for negative test. -*/ -StormServer { - org.apache.zookeeper.server.auth.DigestLoginModule required - user_super="adminsecret" - user_bob="bobsecret"; -}; -StormClient { - org.apache.zookeeper.server.auth.DigestLoginModule required - username="bob" - password="bad_password"; -}; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_missing_client.conf ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_missing_client.conf b/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_missing_client.conf deleted file mode 100644 index f4f2b64..0000000 --- a/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_missing_client.conf +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -StormServer { - org.apache.zookeeper.server.auth.DigestLoginModule required - user_super="adminsecret" - user_bob="bobsecret"; -}; http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_unknown_user.conf ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_unknown_user.conf b/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_unknown_user.conf deleted file mode 100644 index e03a333..0000000 --- a/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_unknown_user.conf +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* This sample file containes an unauthorized user. - We use this file for negative test. -*/ -StormServer { - org.apache.zookeeper.server.auth.DigestLoginModule required - user_super="adminsecret" - user_bob="bobsecret"; -}; -StormClient { - org.apache.zookeeper.server.auth.DigestLoginModule required - username="unknown_user" - password="some_password"; -}; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj index 4b2d085..abc1579 100644 --- a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj +++ b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj @@ -15,7 +15,6 @@ ;; limitations under the License. (ns org.apache.storm.security.auth.nimbus-auth-test (:use [clojure test]) - (:require [org.apache.storm.security.auth [auth-test :refer [nimbus-timeout]]]) (:import [java.nio ByteBuffer]) (:import [java.util Optional]) (:import [org.apache.storm LocalCluster$Builder DaemonConfig Config]) @@ -33,6 +32,10 @@ (:require [conjure.core]) (:use [conjure core])) +;; 3 seconds in milliseconds +;; This is plenty of time for a thrift client to respond. +(def nimbus-timeout (Integer. (* 3 1000))) + (defn to-conf [nimbus-port login-cfg aznClass transportPluginClass] (let [conf {NIMBUS-AUTHORIZER aznClass NIMBUS-THRIFT-PORT nimbus-port http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/pom.xml ---------------------------------------------------------------------- diff --git a/storm-server/pom.xml b/storm-server/pom.xml index cfe2d74..fb175b5 100644 --- a/storm-server/pom.xml +++ b/storm-server/pom.xml @@ -134,7 +134,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>2617</maxAllowedViolations> + <maxAllowedViolations>2585</maxAllowedViolations> </configuration> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/main/java/org/apache/storm/DaemonConfig.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java index 3230f70..d03bfcf 100644 --- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java +++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java @@ -1075,6 +1075,12 @@ public class DaemonConfig implements Validated { @isInteger public static final String STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS = "storm.metricstore.rocksdb.deletion_period_hours"; + /** + * The number of hours a worker token is valid for. This also sets how frequently worker tokens will be renewed. + */ + @isPositiveNumber + public static String STORM_WORKER_TOKEN_LIFE_TIME_HOURS = "storm.worker.token.life.time.hours"; + // VALIDATION ONLY CONFIGS // Some configs inside Config.java may reference classes we don't want to expose in storm-client, but we still want to validate // That they reference a valid class. To allow this to happen we do part of the validation on the client side with annotations on http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/main/java/org/apache/storm/LocalCluster.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java index 20a46a3..39c8d57 100644 --- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java +++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java @@ -35,6 +35,7 @@ import java.util.function.UnaryOperator; import org.apache.storm.blobstore.BlobStore; import org.apache.storm.cluster.ClusterStateContext; import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; import org.apache.storm.cluster.IStateStorage; import org.apache.storm.cluster.IStormClusterState; import org.apache.storm.daemon.Acker; @@ -436,10 +437,10 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { this.daemonConf = new HashMap<>(conf); this.portCounter = new AtomicInteger(builder.supervisorSlotPortMin); - ClusterStateContext cs = new ClusterStateContext(); - this.state = ClusterUtils.mkStateStorage(this.daemonConf, null, null, cs); + ClusterStateContext cs = new ClusterStateContext(DaemonType.NIMBUS, daemonConf); + this.state = ClusterUtils.mkStateStorage(this.daemonConf, null, cs); if (builder.clusterState == null) { - clusterState = ClusterUtils.mkStormClusterState(this.daemonConf, null, cs); + clusterState = ClusterUtils.mkStormClusterState(this.daemonConf, cs); } else { this.clusterState = builder.clusterState; } http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/main/java/org/apache/storm/Testing.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/Testing.java b/storm-server/src/main/java/org/apache/storm/Testing.java index d50f792..1b3bd8e 100644 --- a/storm-server/src/main/java/org/apache/storm/Testing.java +++ b/storm-server/src/main/java/org/apache/storm/Testing.java @@ -104,7 +104,7 @@ public class Testing { /** * Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has * passed - * @param the number of ms to wait before timing out. + * @param timeoutMs the number of ms to wait before timing out. * @param condition what we are waiting for * @param body what to run in the loop * @throws AssertionError if teh loop timed out. @@ -112,9 +112,11 @@ public class Testing { public static void whileTimeout(long timeoutMs, Condition condition, Runnable body) { long endTime = System.currentTimeMillis() + timeoutMs; LOG.debug("Looping until {}", condition); + int count = 0; while (condition.exec()) { + count++; if (System.currentTimeMillis() > endTime) { - LOG.info("Condition {} not met in {} ms", condition, timeoutMs); + LOG.info("Condition {} not met in {} ms after calling {} times", condition, timeoutMs, count); LOG.info(Utils.threadDump()); throw new AssertionError("Test timed out (" + timeoutMs + "ms) " + condition); } http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index f21f455..4e2d8fc 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -128,6 +128,9 @@ import org.apache.storm.generated.WorkerMetricPoint; import org.apache.storm.generated.WorkerMetrics; import org.apache.storm.generated.WorkerResources; import org.apache.storm.generated.WorkerSummary; +import org.apache.storm.generated.WorkerToken; +import org.apache.storm.generated.WorkerTokenInfo; +import org.apache.storm.generated.WorkerTokenServiceType; import org.apache.storm.logging.ThriftAccessLogger; import org.apache.storm.metric.ClusterMetricsConsumerExecutor; import org.apache.storm.metric.StormMetricsRegistry; @@ -170,6 +173,7 @@ import org.apache.storm.security.auth.NimbusPrincipal; import org.apache.storm.security.auth.ReqContext; import org.apache.storm.security.auth.ThriftConnectionType; import org.apache.storm.security.auth.ThriftServer; +import org.apache.storm.security.auth.workertoken.WorkerTokenManager; import org.apache.storm.stats.StatsUtil; import org.apache.storm.utils.BufferInputStream; import org.apache.storm.utils.ConfigUtils; @@ -189,8 +193,6 @@ import org.apache.storm.validation.ConfigValidation; import org.apache.storm.zookeeper.ClientZookeeper; import org.apache.storm.zookeeper.Zookeeper; import org.apache.thrift.TException; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; import org.json.simple.JSONValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -241,17 +243,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private static final String STORM_VERSION = VersionInfo.getVersion(); - @VisibleForTesting - public static final List<ACL> ZK_ACLS = Arrays.asList(ZooDefs.Ids.CREATOR_ALL_ACL.get(0), - new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.CREATE, ZooDefs.Ids.ANYONE_ID_UNSAFE)); - private static final Subject NIMBUS_SUBJECT = new Subject(); static { NIMBUS_SUBJECT.getPrincipals().add(new NimbusPrincipal()); NIMBUS_SUBJECT.setReadOnly(); } - + // TOPOLOGY STATE TRANSITIONS private static StormBase make(TopologyStatus status) { StormBase ret = new StormBase(); @@ -767,6 +765,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { ret.addAll(Utils.OR(state.errorTopologies(), EMPTY_STRING_LIST)); ret.addAll(Utils.OR(store.storedTopoIds(), EMPTY_STRING_SET)); ret.addAll(Utils.OR(state.backpressureTopologies(), EMPTY_STRING_LIST)); + ret.addAll(Utils.OR(state.idsOfTopologiesWithPrivateWorkerKeys(), EMPTY_STRING_SET)); ret.removeAll(Utils.OR(state.activeStorms(), EMPTY_STRING_LIST)); return ret; } @@ -1013,11 +1012,14 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { nimbus.shutdown(); server.stop(); }, 10); + if (AuthUtils.areWorkerTokensEnabledServer(server, conf)) { + nimbus.initWorkerTokenManager(); + } LOG.info("Starting nimbus server for storm version '{}'", STORM_VERSION); server.serve(); return nimbus; } - + public static Nimbus launch(INimbus inimbus) throws Exception { Map<String, Object> conf = Utils.merge(Utils.readStormConfig(), ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false)); @@ -1074,6 +1076,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private final List<ClusterMetricsConsumerExecutor> clusterConsumerExceutors; private final IGroupMappingServiceProvider groupMapper; private final IPrincipalToLocal principalToLocal; + //May be null if worker tokens are not supported by the thrift transport. + private WorkerTokenManager workerTokenManager; private static CuratorFramework makeZKClient(Map<String, Object> conf) { List<String> servers = (List<String>)conf.get(Config.STORM_ZOOKEEPER_SERVERS); @@ -1087,11 +1091,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } private static IStormClusterState makeStormClusterState(Map<String, Object> conf) throws Exception { - List<ACL> acls = null; - if (Utils.isZkAuthenticationConfiguredStormServer(conf)) { - acls = ZK_ACLS; - } - return ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.NIMBUS)); + return ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf)); } public Nimbus(Map<String, Object> conf, INimbus inimbus) throws Exception { @@ -1203,6 +1203,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { return topoCache; } + @VisibleForTesting + void initWorkerTokenManager() { + if (workerTokenManager == null) { + workerTokenManager = new WorkerTokenManager(conf, getStormClusterState()); + } + } + private boolean isLeader() throws Exception { return leaderElector.isLeader(); } @@ -2145,6 +2152,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { state.teardownHeartbeats(topoId); state.teardownTopologyErrors(topoId); state.removeBackpressure(topoId); + state.removeAllPrivateWorkerKeys(topoId); rmDependencyJarsInTopology(topoId); forceDeleteTopoDistDir(topoId); rmTopologyKeys(topoId); @@ -2254,14 +2262,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } IStormClusterState state = stormClusterState; Collection<ICredentialsRenewer> renewers = credRenewers; - Object lock = credUpdateLock; Map<String, StormBase> assignedBases = state.topologyBases(); if (assignedBases != null) { for (Entry<String, StormBase> entry: assignedBases.entrySet()) { String id = entry.getKey(); String ownerPrincipal = entry.getValue().get_principal(); Map<String, Object> topoConf = Collections.unmodifiableMap(Utils.merge(conf, tryReadTopoConf(id, topoCache))); - synchronized(lock) { + synchronized(credUpdateLock) { Credentials origCreds = state.credentials(id, null); if (origCreds != null) { Map<String, String> origCredsMap = origCreds.get_creds(); @@ -2270,6 +2277,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { LOG.info("Renewing Creds For {} with {} owned by {}", id, renewer, ownerPrincipal); renewer.renew(newCredsMap, topoConf, ownerPrincipal); } + //Update worker tokens if needed + upsertWorkerTokensInCreds(newCredsMap, ownerPrincipal, id); if (!newCredsMap.equals(origCredsMap)) { state.setCredentials(id, new Credentials(newCredsMap), topoConf); } @@ -2639,6 +2648,34 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } } + private void upsertWorkerTokensInCreds(Map<String, String> creds, String user, String topologyId) { + if (workerTokenManager != null) { + final long renewIfExpirationBefore = workerTokenManager.getMaxExpirationTimeForRenewal(); + for (WorkerTokenServiceType type : WorkerTokenServiceType.values()) { + boolean shouldAdd = true; + WorkerToken oldToken = AuthUtils.readWorkerToken(creds, type); + if (oldToken != null) { + try { + WorkerTokenInfo info = AuthUtils.getWorkerTokenInfo(oldToken); + if (info.is_set_expirationTimeMillis() || info.get_expirationTimeMillis() > renewIfExpirationBefore) { + //Found an existing token and it is not going to expire any time soon, so don't bother adding in a new + // token. + shouldAdd = false; + } + } catch (Exception e) { + //The old token could not be deserialized. This is bad, but we are going to replace it anyways so just keep going. + LOG.error("Could not deserialize token info", e); + } + } + if (shouldAdd) { + AuthUtils.setWorkerToken(creds, workerTokenManager.createOrUpdateTokenFor(type, user, topologyId)); + } + } + //Remove any expired keys after possibly inserting new ones. + stormClusterState.removeExpiredPrivateWorkerKeys(topologyId); + } + } + @Override public void submitTopologyWithOpts(String topoName, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) @@ -2731,6 +2768,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { for (INimbusCredentialPlugin autocred: nimbusAutocredPlugins) { autocred.populateCredentials(creds, finalConf); } + upsertWorkerTokensInCreds(creds, topologyPrincipal, topoId); } if (ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false) && @@ -3110,6 +3148,14 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } checkAuthorization(topoName, topoConf, "uploadNewCredentials"); synchronized(credUpdateLock) { + //Merge the old credentials so creds nimbus created are not lost. + // And in case the user forgot to upload something important this time. + Credentials origCreds = state.credentials(topoId, null); + if (origCreds != null) { + Map<String, String> mergedCreds = origCreds.get_creds(); + mergedCreds.putAll(credentials.get_creds()); + credentials.set_creds(mergedCreds); + } state.setCredentials(topoId, credentials, topoConf); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java index 147a8aa..823f6c8 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java @@ -24,13 +24,11 @@ import java.net.UnknownHostException; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; - import org.apache.commons.io.FileUtils; import org.apache.storm.DaemonConfig; import org.apache.storm.StormTimer; @@ -49,13 +47,12 @@ import org.apache.storm.messaging.IContext; import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.scheduler.ISupervisor; import org.apache.storm.utils.ConfigUtils; -import org.apache.storm.utils.ServerConfigUtils; -import org.apache.storm.utils.Utils; -import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.ServerConfigUtils; import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; import org.apache.storm.utils.VersionInfo; -import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,14 +95,9 @@ public class Supervisor implements DaemonCommon, AutoCloseable { this.heartbeatExecutor = Executors.newFixedThreadPool(1); iSupervisor.prepare(conf, ServerConfigUtils.supervisorIsupervisorDir(conf)); - - List<ACL> acls = null; - if (Utils.isZkAuthenticationConfiguredStormServer(conf)) { - acls = SupervisorUtils.supervisorZkAcls(); - } try { - this.stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.SUPERVISOR)); + this.stormClusterState = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.SUPERVISOR, conf)); } catch (Exception e) { LOG.error("supervisor can't create stormClusterState"); throw Utils.wrapInRuntime(e); @@ -193,7 +185,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable { } /** - * Launch the supervisor + * Launch the supervisor. */ public void launch() throws Exception { LOG.info("Starting Supervisor with conf {}", conf); @@ -223,7 +215,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable { } /** - * start distribute supervisor + * start distribute supervisor. */ public void launchDaemon() { LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion()); @@ -233,7 +225,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable { throw new IllegalArgumentException("Cannot start server in local mode!"); } launch(); - Utils.addShutdownHookWithForceKillIn1Sec(() -> {this.close();}); + Utils.addShutdownHookWithForceKillIn1Sec(this::close); registerWorkerNumGauge("supervisor:num-slots-used-gauge", conf); StormMetricsRegistry.startMetricsReporters(conf); } catch (Exception e) { @@ -295,7 +287,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable { try { k.forceKill(); long start = Time.currentTimeMillis(); - while(!k.areAllProcessesDead()) { + while (!k.areAllProcessesDead()) { if ((Time.currentTimeMillis() - start) > 10_000) { throw new RuntimeException("Giving up on killing " + k + " after " + (Time.currentTimeMillis() - start) + " ms");