http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/py/storm/ttypes.py ---------------------------------------------------------------------- diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py index a730c13..98a7ba4 100644 --- a/storm-core/src/py/storm/ttypes.py +++ b/storm-core/src/py/storm/ttypes.py @@ -62,6 +62,20 @@ class TopologyInitialStatus: "INACTIVE": 2, } +class AccessControlType: + OTHER = 1 + USER = 2 + + _VALUES_TO_NAMES = { + 1: "OTHER", + 2: "USER", + } + + _NAMES_TO_VALUES = { + "OTHER": 1, + "USER": 2, + } + class TopologyStatus: ACTIVE = 1 INACTIVE = 2 @@ -1802,6 +1816,146 @@ class InvalidTopologyException(TException): def __ne__(self, other): return not (self == other) +class KeyNotFoundException(TException): + """ + Attributes: + - msg + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'msg', None, None, ), # 1 + ) + + def __init__(self, msg=None,): + self.msg = msg + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.msg = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('KeyNotFoundException') + if self.msg is not None: + oprot.writeFieldBegin('msg', TType.STRING, 1) + oprot.writeString(self.msg.encode('utf-8')) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.msg is None: + raise TProtocol.TProtocolException(message='Required field msg is unset!') + return + + + def __str__(self): + return repr(self) + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.msg) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class KeyAlreadyExistsException(TException): + """ + Attributes: + - msg + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'msg', None, None, ), # 1 + ) + + def __init__(self, msg=None,): + self.msg = msg + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.msg = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('KeyAlreadyExistsException') + if self.msg is not None: + oprot.writeFieldBegin('msg', TType.STRING, 1) + oprot.writeString(self.msg.encode('utf-8')) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.msg is None: + raise TProtocol.TProtocolException(message='Required field msg is unset!') + return + + + def __str__(self): + return repr(self) + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.msg) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + class TopologySummary: """ Attributes: @@ -7110,6 +7264,458 @@ class SubmitOptions: def __ne__(self, other): return not (self == other) +class AccessControl: + """ + Attributes: + - type + - name + - access + """ + + thrift_spec = ( + None, # 0 + (1, TType.I32, 'type', None, None, ), # 1 + (2, TType.STRING, 'name', None, None, ), # 2 + (3, TType.I32, 'access', None, None, ), # 3 + ) + + def __init__(self, type=None, name=None, access=None,): + self.type = type + self.name = name + self.access = access + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I32: + self.type = iprot.readI32() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.name = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I32: + self.access = iprot.readI32() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('AccessControl') + if self.type is not None: + oprot.writeFieldBegin('type', TType.I32, 1) + oprot.writeI32(self.type) + oprot.writeFieldEnd() + if self.name is not None: + oprot.writeFieldBegin('name', TType.STRING, 2) + oprot.writeString(self.name.encode('utf-8')) + oprot.writeFieldEnd() + if self.access is not None: + oprot.writeFieldBegin('access', TType.I32, 3) + oprot.writeI32(self.access) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.type is None: + raise TProtocol.TProtocolException(message='Required field type is unset!') + if self.access is None: + raise TProtocol.TProtocolException(message='Required field access is unset!') + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.type) + value = (value * 31) ^ hash(self.name) + value = (value * 31) ^ hash(self.access) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class SettableBlobMeta: + """ + Attributes: + - acl + - replication_factor + """ + + thrift_spec = ( + None, # 0 + (1, TType.LIST, 'acl', (TType.STRUCT,(AccessControl, AccessControl.thrift_spec)), None, ), # 1 + (2, TType.I32, 'replication_factor', None, None, ), # 2 + ) + + def __init__(self, acl=None, replication_factor=None,): + self.acl = acl + self.replication_factor = replication_factor + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.LIST: + self.acl = [] + (_etype440, _size437) = iprot.readListBegin() + for _i441 in xrange(_size437): + _elem442 = AccessControl() + _elem442.read(iprot) + self.acl.append(_elem442) + iprot.readListEnd() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I32: + self.replication_factor = iprot.readI32() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('SettableBlobMeta') + if self.acl is not None: + oprot.writeFieldBegin('acl', TType.LIST, 1) + oprot.writeListBegin(TType.STRUCT, len(self.acl)) + for iter443 in self.acl: + iter443.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + if self.replication_factor is not None: + oprot.writeFieldBegin('replication_factor', TType.I32, 2) + oprot.writeI32(self.replication_factor) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.acl is None: + raise TProtocol.TProtocolException(message='Required field acl is unset!') + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.acl) + value = (value * 31) ^ hash(self.replication_factor) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class ReadableBlobMeta: + """ + Attributes: + - settable + - version + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRUCT, 'settable', (SettableBlobMeta, SettableBlobMeta.thrift_spec), None, ), # 1 + (2, TType.I64, 'version', None, None, ), # 2 + ) + + def __init__(self, settable=None, version=None,): + self.settable = settable + self.version = version + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRUCT: + self.settable = SettableBlobMeta() + self.settable.read(iprot) + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I64: + self.version = iprot.readI64() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('ReadableBlobMeta') + if self.settable is not None: + oprot.writeFieldBegin('settable', TType.STRUCT, 1) + self.settable.write(oprot) + oprot.writeFieldEnd() + if self.version is not None: + oprot.writeFieldBegin('version', TType.I64, 2) + oprot.writeI64(self.version) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.settable is None: + raise TProtocol.TProtocolException(message='Required field settable is unset!') + if self.version is None: + raise TProtocol.TProtocolException(message='Required field version is unset!') + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.settable) + value = (value * 31) ^ hash(self.version) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class ListBlobsResult: + """ + Attributes: + - keys + - session + """ + + thrift_spec = ( + None, # 0 + (1, TType.LIST, 'keys', (TType.STRING,None), None, ), # 1 + (2, TType.STRING, 'session', None, None, ), # 2 + ) + + def __init__(self, keys=None, session=None,): + self.keys = keys + self.session = session + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.LIST: + self.keys = [] + (_etype447, _size444) = iprot.readListBegin() + for _i448 in xrange(_size444): + _elem449 = iprot.readString().decode('utf-8') + self.keys.append(_elem449) + iprot.readListEnd() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.session = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('ListBlobsResult') + if self.keys is not None: + oprot.writeFieldBegin('keys', TType.LIST, 1) + oprot.writeListBegin(TType.STRING, len(self.keys)) + for iter450 in self.keys: + oprot.writeString(iter450.encode('utf-8')) + oprot.writeListEnd() + oprot.writeFieldEnd() + if self.session is not None: + oprot.writeFieldBegin('session', TType.STRING, 2) + oprot.writeString(self.session.encode('utf-8')) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.keys is None: + raise TProtocol.TProtocolException(message='Required field keys is unset!') + if self.session is None: + raise TProtocol.TProtocolException(message='Required field session is unset!') + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.keys) + value = (value * 31) ^ hash(self.session) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class BeginDownloadResult: + """ + Attributes: + - version + - session + - data_size + """ + + thrift_spec = ( + None, # 0 + (1, TType.I64, 'version', None, None, ), # 1 + (2, TType.STRING, 'session', None, None, ), # 2 + (3, TType.I64, 'data_size', None, None, ), # 3 + ) + + def __init__(self, version=None, session=None, data_size=None,): + self.version = version + self.session = session + self.data_size = data_size + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I64: + self.version = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.session = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.data_size = iprot.readI64() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('BeginDownloadResult') + if self.version is not None: + oprot.writeFieldBegin('version', TType.I64, 1) + oprot.writeI64(self.version) + oprot.writeFieldEnd() + if self.session is not None: + oprot.writeFieldBegin('session', TType.STRING, 2) + oprot.writeString(self.session.encode('utf-8')) + oprot.writeFieldEnd() + if self.data_size is not None: + oprot.writeFieldBegin('data_size', TType.I64, 3) + oprot.writeI64(self.data_size) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.version is None: + raise TProtocol.TProtocolException(message='Required field version is unset!') + if self.session is None: + raise TProtocol.TProtocolException(message='Required field session is unset!') + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.version) + value = (value * 31) ^ hash(self.session) + value = (value * 31) ^ hash(self.data_size) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + class SupervisorInfo: """ Attributes: @@ -7175,31 +7781,31 @@ class SupervisorInfo: elif fid == 4: if ftype == TType.LIST: self.used_ports = [] - (_etype440, _size437) = iprot.readListBegin() - for _i441 in xrange(_size437): - _elem442 = iprot.readI64() - self.used_ports.append(_elem442) + (_etype454, _size451) = iprot.readListBegin() + for _i455 in xrange(_size451): + _elem456 = iprot.readI64() + self.used_ports.append(_elem456) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 5: if ftype == TType.LIST: self.meta = [] - (_etype446, _size443) = iprot.readListBegin() - for _i447 in xrange(_size443): - _elem448 = iprot.readI64() - self.meta.append(_elem448) + (_etype460, _size457) = iprot.readListBegin() + for _i461 in xrange(_size457): + _elem462 = iprot.readI64() + self.meta.append(_elem462) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 6: if ftype == TType.MAP: self.scheduler_meta = {} - (_ktype450, _vtype451, _size449 ) = iprot.readMapBegin() - for _i453 in xrange(_size449): - _key454 = iprot.readString().decode('utf-8') - _val455 = iprot.readString().decode('utf-8') - self.scheduler_meta[_key454] = _val455 + (_ktype464, _vtype465, _size463 ) = iprot.readMapBegin() + for _i467 in xrange(_size463): + _key468 = iprot.readString().decode('utf-8') + _val469 = iprot.readString().decode('utf-8') + self.scheduler_meta[_key468] = _val469 iprot.readMapEnd() else: iprot.skip(ftype) @@ -7216,11 +7822,11 @@ class SupervisorInfo: elif fid == 9: if ftype == TType.MAP: self.resources_map = {} - (_ktype457, _vtype458, _size456 ) = iprot.readMapBegin() - for _i460 in xrange(_size456): - _key461 = iprot.readString().decode('utf-8') - _val462 = iprot.readDouble() - self.resources_map[_key461] = _val462 + (_ktype471, _vtype472, _size470 ) = iprot.readMapBegin() + for _i474 in xrange(_size470): + _key475 = iprot.readString().decode('utf-8') + _val476 = iprot.readDouble() + self.resources_map[_key475] = _val476 iprot.readMapEnd() else: iprot.skip(ftype) @@ -7249,23 +7855,23 @@ class SupervisorInfo: if self.used_ports is not None: oprot.writeFieldBegin('used_ports', TType.LIST, 4) oprot.writeListBegin(TType.I64, len(self.used_ports)) - for iter463 in self.used_ports: - oprot.writeI64(iter463) + for iter477 in self.used_ports: + oprot.writeI64(iter477) oprot.writeListEnd() oprot.writeFieldEnd() if self.meta is not None: oprot.writeFieldBegin('meta', TType.LIST, 5) oprot.writeListBegin(TType.I64, len(self.meta)) - for iter464 in self.meta: - oprot.writeI64(iter464) + for iter478 in self.meta: + oprot.writeI64(iter478) oprot.writeListEnd() oprot.writeFieldEnd() if self.scheduler_meta is not None: oprot.writeFieldBegin('scheduler_meta', TType.MAP, 6) oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.scheduler_meta)) - for kiter465,viter466 in self.scheduler_meta.items(): - oprot.writeString(kiter465.encode('utf-8')) - oprot.writeString(viter466.encode('utf-8')) + for kiter479,viter480 in self.scheduler_meta.items(): + oprot.writeString(kiter479.encode('utf-8')) + oprot.writeString(viter480.encode('utf-8')) oprot.writeMapEnd() oprot.writeFieldEnd() if self.uptime_secs is not None: @@ -7279,9 +7885,9 @@ class SupervisorInfo: if self.resources_map is not None: oprot.writeFieldBegin('resources_map', TType.MAP, 9) oprot.writeMapBegin(TType.STRING, TType.DOUBLE, len(self.resources_map)) - for kiter467,viter468 in self.resources_map.items(): - oprot.writeString(kiter467.encode('utf-8')) - oprot.writeDouble(viter468) + for kiter481,viter482 in self.resources_map.items(): + oprot.writeString(kiter481.encode('utf-8')) + oprot.writeDouble(viter482) oprot.writeMapEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -7353,10 +7959,10 @@ class NodeInfo: elif fid == 2: if ftype == TType.SET: self.port = set() - (_etype472, _size469) = iprot.readSetBegin() - for _i473 in xrange(_size469): - _elem474 = iprot.readI64() - self.port.add(_elem474) + (_etype486, _size483) = iprot.readSetBegin() + for _i487 in xrange(_size483): + _elem488 = iprot.readI64() + self.port.add(_elem488) iprot.readSetEnd() else: iprot.skip(ftype) @@ -7377,8 +7983,8 @@ class NodeInfo: if self.port is not None: oprot.writeFieldBegin('port', TType.SET, 2) oprot.writeSetBegin(TType.I64, len(self.port)) - for iter475 in self.port: - oprot.writeI64(iter475) + for iter489 in self.port: + oprot.writeI64(iter489) oprot.writeSetEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -7559,57 +8165,57 @@ class Assignment: elif fid == 2: if ftype == TType.MAP: self.node_host = {} - (_ktype477, _vtype478, _size476 ) = iprot.readMapBegin() - for _i480 in xrange(_size476): - _key481 = iprot.readString().decode('utf-8') - _val482 = iprot.readString().decode('utf-8') - self.node_host[_key481] = _val482 + (_ktype491, _vtype492, _size490 ) = iprot.readMapBegin() + for _i494 in xrange(_size490): + _key495 = iprot.readString().decode('utf-8') + _val496 = iprot.readString().decode('utf-8') + self.node_host[_key495] = _val496 iprot.readMapEnd() else: iprot.skip(ftype) elif fid == 3: if ftype == TType.MAP: self.executor_node_port = {} - (_ktype484, _vtype485, _size483 ) = iprot.readMapBegin() - for _i487 in xrange(_size483): - _key488 = [] - (_etype493, _size490) = iprot.readListBegin() - for _i494 in xrange(_size490): - _elem495 = iprot.readI64() - _key488.append(_elem495) + (_ktype498, _vtype499, _size497 ) = iprot.readMapBegin() + for _i501 in xrange(_size497): + _key502 = [] + (_etype507, _size504) = iprot.readListBegin() + for _i508 in xrange(_size504): + _elem509 = iprot.readI64() + _key502.append(_elem509) iprot.readListEnd() - _val489 = NodeInfo() - _val489.read(iprot) - self.executor_node_port[_key488] = _val489 + _val503 = NodeInfo() + _val503.read(iprot) + self.executor_node_port[_key502] = _val503 iprot.readMapEnd() else: iprot.skip(ftype) elif fid == 4: if ftype == TType.MAP: self.executor_start_time_secs = {} - (_ktype497, _vtype498, _size496 ) = iprot.readMapBegin() - for _i500 in xrange(_size496): - _key501 = [] - (_etype506, _size503) = iprot.readListBegin() - for _i507 in xrange(_size503): - _elem508 = iprot.readI64() - _key501.append(_elem508) + (_ktype511, _vtype512, _size510 ) = iprot.readMapBegin() + for _i514 in xrange(_size510): + _key515 = [] + (_etype520, _size517) = iprot.readListBegin() + for _i521 in xrange(_size517): + _elem522 = iprot.readI64() + _key515.append(_elem522) iprot.readListEnd() - _val502 = iprot.readI64() - self.executor_start_time_secs[_key501] = _val502 + _val516 = iprot.readI64() + self.executor_start_time_secs[_key515] = _val516 iprot.readMapEnd() else: iprot.skip(ftype) elif fid == 5: if ftype == TType.MAP: self.worker_resources = {} - (_ktype510, _vtype511, _size509 ) = iprot.readMapBegin() - for _i513 in xrange(_size509): - _key514 = NodeInfo() - _key514.read(iprot) - _val515 = WorkerResources() - _val515.read(iprot) - self.worker_resources[_key514] = _val515 + (_ktype524, _vtype525, _size523 ) = iprot.readMapBegin() + for _i527 in xrange(_size523): + _key528 = NodeInfo() + _key528.read(iprot) + _val529 = WorkerResources() + _val529.read(iprot) + self.worker_resources[_key528] = _val529 iprot.readMapEnd() else: iprot.skip(ftype) @@ -7630,39 +8236,39 @@ class Assignment: if self.node_host is not None: oprot.writeFieldBegin('node_host', TType.MAP, 2) oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.node_host)) - for kiter516,viter517 in self.node_host.items(): - oprot.writeString(kiter516.encode('utf-8')) - oprot.writeString(viter517.encode('utf-8')) + for kiter530,viter531 in self.node_host.items(): + oprot.writeString(kiter530.encode('utf-8')) + oprot.writeString(viter531.encode('utf-8')) oprot.writeMapEnd() oprot.writeFieldEnd() if self.executor_node_port is not None: oprot.writeFieldBegin('executor_node_port', TType.MAP, 3) oprot.writeMapBegin(TType.LIST, TType.STRUCT, len(self.executor_node_port)) - for kiter518,viter519 in self.executor_node_port.items(): - oprot.writeListBegin(TType.I64, len(kiter518)) - for iter520 in kiter518: - oprot.writeI64(iter520) + for kiter532,viter533 in self.executor_node_port.items(): + oprot.writeListBegin(TType.I64, len(kiter532)) + for iter534 in kiter532: + oprot.writeI64(iter534) oprot.writeListEnd() - viter519.write(oprot) + viter533.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() if self.executor_start_time_secs is not None: oprot.writeFieldBegin('executor_start_time_secs', TType.MAP, 4) oprot.writeMapBegin(TType.LIST, TType.I64, len(self.executor_start_time_secs)) - for kiter521,viter522 in self.executor_start_time_secs.items(): - oprot.writeListBegin(TType.I64, len(kiter521)) - for iter523 in kiter521: - oprot.writeI64(iter523) + for kiter535,viter536 in self.executor_start_time_secs.items(): + oprot.writeListBegin(TType.I64, len(kiter535)) + for iter537 in kiter535: + oprot.writeI64(iter537) oprot.writeListEnd() - oprot.writeI64(viter522) + oprot.writeI64(viter536) oprot.writeMapEnd() oprot.writeFieldEnd() if self.worker_resources is not None: oprot.writeFieldBegin('worker_resources', TType.MAP, 5) oprot.writeMapBegin(TType.STRUCT, TType.STRUCT, len(self.worker_resources)) - for kiter524,viter525 in self.worker_resources.items(): - kiter524.write(oprot) - viter525.write(oprot) + for kiter538,viter539 in self.worker_resources.items(): + kiter538.write(oprot) + viter539.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -7839,11 +8445,11 @@ class StormBase: elif fid == 4: if ftype == TType.MAP: self.component_executors = {} - (_ktype527, _vtype528, _size526 ) = iprot.readMapBegin() - for _i530 in xrange(_size526): - _key531 = iprot.readString().decode('utf-8') - _val532 = iprot.readI32() - self.component_executors[_key531] = _val532 + (_ktype541, _vtype542, _size540 ) = iprot.readMapBegin() + for _i544 in xrange(_size540): + _key545 = iprot.readString().decode('utf-8') + _val546 = iprot.readI32() + self.component_executors[_key545] = _val546 iprot.readMapEnd() else: iprot.skip(ftype) @@ -7871,12 +8477,12 @@ class StormBase: elif fid == 9: if ftype == TType.MAP: self.component_debug = {} - (_ktype534, _vtype535, _size533 ) = iprot.readMapBegin() - for _i537 in xrange(_size533): - _key538 = iprot.readString().decode('utf-8') - _val539 = DebugOptions() - _val539.read(iprot) - self.component_debug[_key538] = _val539 + (_ktype548, _vtype549, _size547 ) = iprot.readMapBegin() + for _i551 in xrange(_size547): + _key552 = iprot.readString().decode('utf-8') + _val553 = DebugOptions() + _val553.read(iprot) + self.component_debug[_key552] = _val553 iprot.readMapEnd() else: iprot.skip(ftype) @@ -7905,9 +8511,9 @@ class StormBase: if self.component_executors is not None: oprot.writeFieldBegin('component_executors', TType.MAP, 4) oprot.writeMapBegin(TType.STRING, TType.I32, len(self.component_executors)) - for kiter540,viter541 in self.component_executors.items(): - oprot.writeString(kiter540.encode('utf-8')) - oprot.writeI32(viter541) + for kiter554,viter555 in self.component_executors.items(): + oprot.writeString(kiter554.encode('utf-8')) + oprot.writeI32(viter555) oprot.writeMapEnd() oprot.writeFieldEnd() if self.launch_time_secs is not None: @@ -7929,9 +8535,9 @@ class StormBase: if self.component_debug is not None: oprot.writeFieldBegin('component_debug', TType.MAP, 9) oprot.writeMapBegin(TType.STRING, TType.STRUCT, len(self.component_debug)) - for kiter542,viter543 in self.component_debug.items(): - oprot.writeString(kiter542.encode('utf-8')) - viter543.write(oprot) + for kiter556,viter557 in self.component_debug.items(): + oprot.writeString(kiter556.encode('utf-8')) + viter557.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -8011,13 +8617,13 @@ class ClusterWorkerHeartbeat: elif fid == 2: if ftype == TType.MAP: self.executor_stats = {} - (_ktype545, _vtype546, _size544 ) = iprot.readMapBegin() - for _i548 in xrange(_size544): - _key549 = ExecutorInfo() - _key549.read(iprot) - _val550 = ExecutorStats() - _val550.read(iprot) - self.executor_stats[_key549] = _val550 + (_ktype559, _vtype560, _size558 ) = iprot.readMapBegin() + for _i562 in xrange(_size558): + _key563 = ExecutorInfo() + _key563.read(iprot) + _val564 = ExecutorStats() + _val564.read(iprot) + self.executor_stats[_key563] = _val564 iprot.readMapEnd() else: iprot.skip(ftype) @@ -8048,9 +8654,9 @@ class ClusterWorkerHeartbeat: if self.executor_stats is not None: oprot.writeFieldBegin('executor_stats', TType.MAP, 2) oprot.writeMapBegin(TType.STRUCT, TType.STRUCT, len(self.executor_stats)) - for kiter551,viter552 in self.executor_stats.items(): - kiter551.write(oprot) - viter552.write(oprot) + for kiter565,viter566 in self.executor_stats.items(): + kiter565.write(oprot) + viter566.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() if self.time_secs is not None: @@ -8203,12 +8809,12 @@ class LocalStateData: if fid == 1: if ftype == TType.MAP: self.serialized_parts = {} - (_ktype554, _vtype555, _size553 ) = iprot.readMapBegin() - for _i557 in xrange(_size553): - _key558 = iprot.readString().decode('utf-8') - _val559 = ThriftSerializedObject() - _val559.read(iprot) - self.serialized_parts[_key558] = _val559 + (_ktype568, _vtype569, _size567 ) = iprot.readMapBegin() + for _i571 in xrange(_size567): + _key572 = iprot.readString().decode('utf-8') + _val573 = ThriftSerializedObject() + _val573.read(iprot) + self.serialized_parts[_key572] = _val573 iprot.readMapEnd() else: iprot.skip(ftype) @@ -8225,9 +8831,9 @@ class LocalStateData: if self.serialized_parts is not None: oprot.writeFieldBegin('serialized_parts', TType.MAP, 1) oprot.writeMapBegin(TType.STRING, TType.STRUCT, len(self.serialized_parts)) - for kiter560,viter561 in self.serialized_parts.items(): - oprot.writeString(kiter560.encode('utf-8')) - viter561.write(oprot) + for kiter574,viter575 in self.serialized_parts.items(): + oprot.writeString(kiter574.encode('utf-8')) + viter575.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -8292,11 +8898,11 @@ class LocalAssignment: elif fid == 2: if ftype == TType.LIST: self.executors = [] - (_etype565, _size562) = iprot.readListBegin() - for _i566 in xrange(_size562): - _elem567 = ExecutorInfo() - _elem567.read(iprot) - self.executors.append(_elem567) + (_etype579, _size576) = iprot.readListBegin() + for _i580 in xrange(_size576): + _elem581 = ExecutorInfo() + _elem581.read(iprot) + self.executors.append(_elem581) iprot.readListEnd() else: iprot.skip(ftype) @@ -8323,8 +8929,8 @@ class LocalAssignment: if self.executors is not None: oprot.writeFieldBegin('executors', TType.LIST, 2) oprot.writeListBegin(TType.STRUCT, len(self.executors)) - for iter568 in self.executors: - iter568.write(oprot) + for iter582 in self.executors: + iter582.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() if self.resources is not None: @@ -8453,11 +9059,11 @@ class LSApprovedWorkers: if fid == 1: if ftype == TType.MAP: self.approved_workers = {} - (_ktype570, _vtype571, _size569 ) = iprot.readMapBegin() - for _i573 in xrange(_size569): - _key574 = iprot.readString().decode('utf-8') - _val575 = iprot.readI32() - self.approved_workers[_key574] = _val575 + (_ktype584, _vtype585, _size583 ) = iprot.readMapBegin() + for _i587 in xrange(_size583): + _key588 = iprot.readString().decode('utf-8') + _val589 = iprot.readI32() + self.approved_workers[_key588] = _val589 iprot.readMapEnd() else: iprot.skip(ftype) @@ -8474,9 +9080,9 @@ class LSApprovedWorkers: if self.approved_workers is not None: oprot.writeFieldBegin('approved_workers', TType.MAP, 1) oprot.writeMapBegin(TType.STRING, TType.I32, len(self.approved_workers)) - for kiter576,viter577 in self.approved_workers.items(): - oprot.writeString(kiter576.encode('utf-8')) - oprot.writeI32(viter577) + for kiter590,viter591 in self.approved_workers.items(): + oprot.writeString(kiter590.encode('utf-8')) + oprot.writeI32(viter591) oprot.writeMapEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -8530,12 +9136,12 @@ class LSSupervisorAssignments: if fid == 1: if ftype == TType.MAP: self.assignments = {} - (_ktype579, _vtype580, _size578 ) = iprot.readMapBegin() - for _i582 in xrange(_size578): - _key583 = iprot.readI32() - _val584 = LocalAssignment() - _val584.read(iprot) - self.assignments[_key583] = _val584 + (_ktype593, _vtype594, _size592 ) = iprot.readMapBegin() + for _i596 in xrange(_size592): + _key597 = iprot.readI32() + _val598 = LocalAssignment() + _val598.read(iprot) + self.assignments[_key597] = _val598 iprot.readMapEnd() else: iprot.skip(ftype) @@ -8552,9 +9158,9 @@ class LSSupervisorAssignments: if self.assignments is not None: oprot.writeFieldBegin('assignments', TType.MAP, 1) oprot.writeMapBegin(TType.I32, TType.STRUCT, len(self.assignments)) - for kiter585,viter586 in self.assignments.items(): - oprot.writeI32(kiter585) - viter586.write(oprot) + for kiter599,viter600 in self.assignments.items(): + oprot.writeI32(kiter599) + viter600.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -8627,11 +9233,11 @@ class LSWorkerHeartbeat: elif fid == 3: if ftype == TType.LIST: self.executors = [] - (_etype590, _size587) = iprot.readListBegin() - for _i591 in xrange(_size587): - _elem592 = ExecutorInfo() - _elem592.read(iprot) - self.executors.append(_elem592) + (_etype604, _size601) = iprot.readListBegin() + for _i605 in xrange(_size601): + _elem606 = ExecutorInfo() + _elem606.read(iprot) + self.executors.append(_elem606) iprot.readListEnd() else: iprot.skip(ftype) @@ -8661,8 +9267,8 @@ class LSWorkerHeartbeat: if self.executors is not None: oprot.writeFieldBegin('executors', TType.LIST, 3) oprot.writeListBegin(TType.STRUCT, len(self.executors)) - for iter593 in self.executors: - iter593.write(oprot) + for iter607 in self.executors: + iter607.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() if self.port is not None: @@ -8748,20 +9354,20 @@ class LSTopoHistory: elif fid == 3: if ftype == TType.LIST: self.users = [] - (_etype597, _size594) = iprot.readListBegin() - for _i598 in xrange(_size594): - _elem599 = iprot.readString().decode('utf-8') - self.users.append(_elem599) + (_etype611, _size608) = iprot.readListBegin() + for _i612 in xrange(_size608): + _elem613 = iprot.readString().decode('utf-8') + self.users.append(_elem613) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 4: if ftype == TType.LIST: self.groups = [] - (_etype603, _size600) = iprot.readListBegin() - for _i604 in xrange(_size600): - _elem605 = iprot.readString().decode('utf-8') - self.groups.append(_elem605) + (_etype617, _size614) = iprot.readListBegin() + for _i618 in xrange(_size614): + _elem619 = iprot.readString().decode('utf-8') + self.groups.append(_elem619) iprot.readListEnd() else: iprot.skip(ftype) @@ -8786,15 +9392,15 @@ class LSTopoHistory: if self.users is not None: oprot.writeFieldBegin('users', TType.LIST, 3) oprot.writeListBegin(TType.STRING, len(self.users)) - for iter606 in self.users: - oprot.writeString(iter606.encode('utf-8')) + for iter620 in self.users: + oprot.writeString(iter620.encode('utf-8')) oprot.writeListEnd() oprot.writeFieldEnd() if self.groups is not None: oprot.writeFieldBegin('groups', TType.LIST, 4) oprot.writeListBegin(TType.STRING, len(self.groups)) - for iter607 in self.groups: - oprot.writeString(iter607.encode('utf-8')) + for iter621 in self.groups: + oprot.writeString(iter621.encode('utf-8')) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -8857,11 +9463,11 @@ class LSTopoHistoryList: if fid == 1: if ftype == TType.LIST: self.topo_history = [] - (_etype611, _size608) = iprot.readListBegin() - for _i612 in xrange(_size608): - _elem613 = LSTopoHistory() - _elem613.read(iprot) - self.topo_history.append(_elem613) + (_etype625, _size622) = iprot.readListBegin() + for _i626 in xrange(_size622): + _elem627 = LSTopoHistory() + _elem627.read(iprot) + self.topo_history.append(_elem627) iprot.readListEnd() else: iprot.skip(ftype) @@ -8878,8 +9484,8 @@ class LSTopoHistoryList: if self.topo_history is not None: oprot.writeFieldBegin('topo_history', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.topo_history)) - for iter614 in self.topo_history: - iter614.write(oprot) + for iter628 in self.topo_history: + iter628.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -9214,12 +9820,12 @@ class LogConfig: if fid == 2: if ftype == TType.MAP: self.named_logger_level = {} - (_ktype616, _vtype617, _size615 ) = iprot.readMapBegin() - for _i619 in xrange(_size615): - _key620 = iprot.readString().decode('utf-8') - _val621 = LogLevel() - _val621.read(iprot) - self.named_logger_level[_key620] = _val621 + (_ktype630, _vtype631, _size629 ) = iprot.readMapBegin() + for _i633 in xrange(_size629): + _key634 = iprot.readString().decode('utf-8') + _val635 = LogLevel() + _val635.read(iprot) + self.named_logger_level[_key634] = _val635 iprot.readMapEnd() else: iprot.skip(ftype) @@ -9236,9 +9842,9 @@ class LogConfig: if self.named_logger_level is not None: oprot.writeFieldBegin('named_logger_level', TType.MAP, 2) oprot.writeMapBegin(TType.STRING, TType.STRUCT, len(self.named_logger_level)) - for kiter622,viter623 in self.named_logger_level.items(): - oprot.writeString(kiter622.encode('utf-8')) - viter623.write(oprot) + for kiter636,viter637 in self.named_logger_level.items(): + oprot.writeString(kiter636.encode('utf-8')) + viter637.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -9290,10 +9896,10 @@ class TopologyHistoryInfo: if fid == 1: if ftype == TType.LIST: self.topo_ids = [] - (_etype627, _size624) = iprot.readListBegin() - for _i628 in xrange(_size624): - _elem629 = iprot.readString().decode('utf-8') - self.topo_ids.append(_elem629) + (_etype641, _size638) = iprot.readListBegin() + for _i642 in xrange(_size638): + _elem643 = iprot.readString().decode('utf-8') + self.topo_ids.append(_elem643) iprot.readListEnd() else: iprot.skip(ftype) @@ -9310,8 +9916,8 @@ class TopologyHistoryInfo: if self.topo_ids is not None: oprot.writeFieldBegin('topo_ids', TType.LIST, 1) oprot.writeListBegin(TType.STRING, len(self.topo_ids)) - for iter630 in self.topo_ids: - oprot.writeString(iter630.encode('utf-8')) + for iter644 in self.topo_ids: + oprot.writeString(iter644.encode('utf-8')) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -9961,11 +10567,11 @@ class HBRecords: if fid == 1: if ftype == TType.LIST: self.pulses = [] - (_etype627, _size624) = iprot.readListBegin() - for _i628 in xrange(_size624): - _elem629 = HBPulse() - _elem629.read(iprot) - self.pulses.append(_elem629) + (_etype648, _size645) = iprot.readListBegin() + for _i649 in xrange(_size645): + _elem650 = HBPulse() + _elem650.read(iprot) + self.pulses.append(_elem650) iprot.readListEnd() else: iprot.skip(ftype) @@ -9982,8 +10588,8 @@ class HBRecords: if self.pulses is not None: oprot.writeFieldBegin('pulses', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.pulses)) - for iter630 in self.pulses: - iter630.write(oprot) + for iter651 in self.pulses: + iter651.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -10035,10 +10641,10 @@ class HBNodes: if fid == 1: if ftype == TType.LIST: self.pulseIds = [] - (_etype634, _size631) = iprot.readListBegin() - for _i635 in xrange(_size631): - _elem636 = iprot.readString().decode('utf-8') - self.pulseIds.append(_elem636) + (_etype655, _size652) = iprot.readListBegin() + for _i656 in xrange(_size652): + _elem657 = iprot.readString().decode('utf-8') + self.pulseIds.append(_elem657) iprot.readListEnd() else: iprot.skip(ftype) @@ -10055,8 +10661,8 @@ class HBNodes: if self.pulseIds is not None: oprot.writeFieldBegin('pulseIds', TType.LIST, 1) oprot.writeListBegin(TType.STRING, len(self.pulseIds)) - for iter637 in self.pulseIds: - oprot.writeString(iter637.encode('utf-8')) + for iter658 in self.pulseIds: + oprot.writeString(iter658.encode('utf-8')) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop()
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/storm.thrift ---------------------------------------------------------------------- diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift index 677de2b..08be005 100644 --- a/storm-core/src/storm.thrift +++ b/storm-core/src/storm.thrift @@ -136,6 +136,14 @@ exception InvalidTopologyException { 1: required string msg; } +exception KeyNotFoundException { + 1: required string msg; +} + +exception KeyAlreadyExistsException { + 1: required string msg; +} + struct TopologySummary { 1: required string id; 2: required string name; @@ -371,6 +379,42 @@ struct SubmitOptions { 2: optional Credentials creds; } +enum AccessControlType { + OTHER = 1, + USER = 2 + //eventually ,GROUP=3 +} + +struct AccessControl { + 1: required AccessControlType type; + 2: optional string name; //Name of user or group in ACL + 3: required i32 access; //bitmasks READ=0x1, WRITE=0x2, ADMIN=0x4 +} + +struct SettableBlobMeta { + 1: required list<AccessControl> acl; + 2: optional i32 replication_factor +} + +struct ReadableBlobMeta { + 1: required SettableBlobMeta settable; + //This is some indication of a version of a BLOB. The only guarantee is + // if the data changed in the blob the version will be different. + 2: required i64 version; +} + +struct ListBlobsResult { + 1: required list<string> keys; + 2: required string session; +} + +struct BeginDownloadResult { + //Same version as in ReadableBlobMeta + 1: required i64 version; + 2: required string session; + 3: optional i64 data_size; +} + struct SupervisorInfo { 1: required i64 time_secs; 2: required string hostname; @@ -565,6 +609,21 @@ service Nimbus { void uploadNewCredentials(1: string name, 2: Credentials creds) throws (1: NotAliveException e, 2: InvalidTopologyException ite, 3: AuthorizationException aze); + string beginCreateBlob(1: string key, 2: SettableBlobMeta meta) throws (1: AuthorizationException aze, 2: KeyAlreadyExistsException kae); + string beginUpdateBlob(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf); + void uploadBlobChunk(1: string session, 2: binary chunk) throws (1: AuthorizationException aze); + void finishBlobUpload(1: string session) throws (1: AuthorizationException aze); + void cancelBlobUpload(1: string session) throws (1: AuthorizationException aze); + ReadableBlobMeta getBlobMeta(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf); + void setBlobMeta(1: string key, 2: SettableBlobMeta meta) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf); + BeginDownloadResult beginBlobDownload(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf); + binary downloadBlobChunk(1: string session) throws (1: AuthorizationException aze); + void deleteBlob(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf); + ListBlobsResult listBlobs(1: string session); //empty string "" means start at the beginning + i32 getBlobReplication(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf); + i32 updateBlobReplication(1: string key, 2: i32 replication) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf); + void createStateInZookeeper(1: string key); // creates state in zookeeper when blob is uploaded through command line + // need to add functions for asking about status of storms, what nodes they're running on, looking at task logs string beginFileUpload() throws (1: AuthorizationException aze); http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/clj/backtype/storm/cluster_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/cluster_test.clj b/storm-core/test/clj/backtype/storm/cluster_test.clj index eea4637..82f305b 100644 --- a/storm-core/test/clj/backtype/storm/cluster_test.clj +++ b/storm-core/test/clj/backtype/storm/cluster_test.clj @@ -208,14 +208,15 @@ (.set-credentials! state "storm1" {"b" "b"} {}) (is (= {"b" "b"} (.credentials state "storm1" nil))) - (is (= [] (.code-distributor state nil))) - (.setup-code-distributor! state "storm1" nimbusInfo1) - (is (= ["storm1"] (.code-distributor state nil))) - (is (= [nimbusInfo1] (.code-distributor-info state "storm1"))) - (.setup-code-distributor! state "storm1" nimbusInfo2) - (is (= #{nimbusInfo1 nimbusInfo2} (set (.code-distributor-info state "storm1")))) - (.remove-storm! state "storm1") - (is (= [] (.code-distributor state nil))) + (is (= [] (.blobstore-info state nil))) + (.setup-blobstore! state "key1" nimbusInfo1 "1") + (is (= ["key1"] (.blobstore-info state nil))) + (is (= [(str (.toHostPortString nimbusInfo1) "-1")] (.blobstore-info state "key1"))) + (.setup-blobstore! state "key1" nimbusInfo2 "1") + (is (= #{(str (.toHostPortString nimbusInfo1) "-1") + (str (.toHostPortString nimbusInfo2) "-1")} (set (.blobstore-info state "key1")))) + (.remove-blobstore-key! state "key1") + (is (= [] (.blobstore-info state nil))) (is (= [] (.nimbuses state))) (.add-nimbus-host! state "nimbus1:port" nimbusSummary1) @@ -266,8 +267,7 @@ (let [state1 (mk-storm-state zk-port) state2 (mk-storm-state zk-port) supervisor-info1 (SupervisorInfo. 10 "hostname-1" "id1" [1 2] [] {} 1000 "0.9.2" nil) - supervisor-info2 (SupervisorInfo. 10 "hostname-2" "id2" [1 2] [] {} 1000 "0.9.2" nil) - ] + supervisor-info2 (SupervisorInfo. 10 "hostname-2" "id2" [1 2] [] {} 1000 "0.9.2" nil)] (is (= [] (.supervisors state1 nil))) (.supervisor-heartbeat! state2 "2" supervisor-info2) (.supervisor-heartbeat! state1 "1" supervisor-info1) http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/clj/backtype/storm/nimbus_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/nimbus_test.clj b/storm-core/test/clj/backtype/storm/nimbus_test.clj index 53d4bb8..0847883 100644 --- a/storm-core/test/clj/backtype/storm/nimbus_test.clj +++ b/storm-core/test/clj/backtype/storm/nimbus_test.clj @@ -29,7 +29,7 @@ LogConfig LogLevel LogLevelAction]) (:import [java.util HashMap]) (:import [java.io File]) - (:import [backtype.storm.utils Time]) + (:import [backtype.storm.utils Time Utils]) (:import [org.apache.commons.io FileUtils]) (:use [backtype.storm testing MockAutoCred util config log timer zookeeper]) (:use [backtype.storm.daemon common]) @@ -939,41 +939,15 @@ (bind storm-id1 (get-storm-id cluster-state "t1")) (bind storm-id2 (get-storm-id cluster-state "t2")) (.shutdown nimbus) - (rmr (master-stormdist-root conf storm-id1)) + (let [blob-store (Utils/getNimbusBlobStore conf nil)] + (nimbus/blob-rm-topology-keys storm-id1 blob-store cluster-state) + (.shutdown blob-store)) (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus))) (is ( = #{storm-id2} (set (.active-storms cluster-state)))) (.shutdown nimbus) (.disconnect cluster-state) ))))) - -(deftest test-cleans-corrupt - (with-inprocess-zookeeper zk-port - (with-local-tmp [nimbus-dir] - (stubbing [zk-leader-elector (mock-leader-elector)] - (letlocals - (bind conf (merge (read-storm-config) - {STORM-ZOOKEEPER-SERVERS ["localhost"] - STORM-CLUSTER-MODE "local" - STORM-ZOOKEEPER-PORT zk-port - STORM-LOCAL-DIR nimbus-dir})) - (bind cluster-state (cluster/mk-storm-cluster-state conf)) - (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus))) - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)} - {})) - (submit-local-topology nimbus "t1" {} topology) - (submit-local-topology nimbus "t2" {} topology) - (bind storm-id1 (get-storm-id cluster-state "t1")) - (bind storm-id2 (get-storm-id cluster-state "t2")) - (.shutdown nimbus) - (rmr (master-stormdist-root conf storm-id1)) - (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus))) - (is ( = #{storm-id2} (set (.active-storms cluster-state)))) - (.shutdown nimbus) - (.disconnect cluster-state) - ))))) - ;(deftest test-no-overlapping-slots ; ;; test that same node+port never appears across 2 assignments ; ) @@ -1173,7 +1147,7 @@ nimbus/check-authorization! [1 2 3] expected-name expected-conf expected-operation) (verify-first-call-args-for-indices - nimbus/try-read-storm-topology [0] expected-conf)))))))))) + nimbus/try-read-storm-topology [0] "fake-id")))))))))) (deftest test-nimbus-iface-getTopology-methods-throw-correctly (with-local-cluster [cluster] @@ -1230,7 +1204,8 @@ :status {:type bogus-type}} } ] - (stubbing [topology-bases bogus-bases] + (stubbing [topology-bases bogus-bases + nimbus/get-blob-replication-count 1] (let [topos (.get_topologies (.getClusterInfo nimbus))] ; The number of topologies in the summary is correct. (is (= (count @@ -1265,6 +1240,7 @@ digest "storm:thisisapoorpassword" auth-conf {STORM-ZOOKEEPER-AUTH-SCHEME scheme STORM-ZOOKEEPER-AUTH-PAYLOAD digest + STORM-PRINCIPAL-TO-LOCAL-PLUGIN "backtype.storm.security.auth.DefaultPrincipalToLocal" NIMBUS-THRIFT-PORT 6666} expected-acls nimbus/NIMBUS-ZK-ACLS fake-inimbus (reify INimbus (getForcedScheduler [this] nil))] @@ -1272,10 +1248,11 @@ mk-authorization-handler nil cluster/mk-storm-cluster-state nil nimbus/file-cache-map nil + nimbus/mk-blob-cache-map nil + nimbus/mk-bloblist-cache-map nil uptime-computer nil new-instance nil mk-timer nil - nimbus/mk-code-distributor nil zk-leader-elector nil nimbus/mk-scheduler nil] (nimbus/nimbus-data auth-conf fake-inimbus) http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/clj/backtype/storm/security/auth/ReqContext_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/security/auth/ReqContext_test.clj b/storm-core/test/clj/backtype/storm/security/auth/ReqContext_test.clj index ea45ddc..18d4ada 100644 --- a/storm-core/test/clj/backtype/storm/security/auth/ReqContext_test.clj +++ b/storm-core/test/clj/backtype/storm/security/auth/ReqContext_test.clj @@ -68,5 +68,6 @@ (.setSubject rc s) (is (not (nil? (.principal rc)))) (is (= (-> rc .principal .getName) principal-name)) + (.setSubject rc nil) ) ) http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/clj/backtype/storm/supervisor_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj index 04c8600..776ad6e 100644 --- a/storm-core/test/clj/backtype/storm/supervisor_test.clj +++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj @@ -324,7 +324,8 @@ set-worker-user! nil supervisor/jlp nil worker-artifacts-root "/tmp/workers-artifacts" - supervisor/write-log-metadata! nil] + supervisor/write-log-metadata! nil + supervisor/create-blobstore-links nil] (supervisor/launch-worker mock-supervisor mock-storm-id mock-port @@ -346,8 +347,9 @@ launch-process nil set-worker-user! nil supervisor/jlp nil - worker-artifacts-root "/tmp/workers-artifacts" - supervisor/write-log-metadata! nil] + supervisor/write-log-metadata! nil + supervisor/create-blobstore-links nil + worker-artifacts-root "/tmp/workers-artifacts"] (supervisor/launch-worker mock-supervisor mock-storm-id mock-port @@ -367,7 +369,8 @@ set-worker-user! nil supervisor/write-log-metadata! nil launch-process nil - current-classpath (str file-path-separator "base")] + current-classpath (str file-path-separator "base") + supervisor/create-blobstore-links nil] (supervisor/launch-worker mock-supervisor mock-storm-id mock-port @@ -388,7 +391,8 @@ launch-process nil set-worker-user! nil supervisor/write-log-metadata! nil - current-classpath (str file-path-separator "base")] + current-classpath (str file-path-separator "base") + supervisor/create-blobstore-links nil] (supervisor/launch-worker mock-supervisor mock-storm-id mock-port @@ -540,8 +544,8 @@ cluster/mk-storm-cluster-state nil supervisor-state nil local-hostname nil - supervisor/mk-code-distributor nil - mk-timer nil] + mk-timer nil + supervisor-local-dir nil] (supervisor/supervisor-data auth-conf nil fake-isupervisor) (verify-call-times-for cluster/mk-storm-cluster-state 1) (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2] http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/jvm/backtype/storm/blobstore/BlobStoreTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/backtype/storm/blobstore/BlobStoreTest.java b/storm-core/test/jvm/backtype/storm/blobstore/BlobStoreTest.java new file mode 100644 index 0000000..388b491 --- /dev/null +++ b/storm-core/test/jvm/backtype/storm/blobstore/BlobStoreTest.java @@ -0,0 +1,461 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.blobstore; + +import backtype.storm.Config; +import backtype.storm.generated.AuthorizationException; +import backtype.storm.generated.KeyAlreadyExistsException; +import backtype.storm.generated.KeyNotFoundException; +import backtype.storm.generated.SettableBlobMeta; + +import backtype.storm.security.auth.NimbusPrincipal; +import backtype.storm.security.auth.SingleUserPrincipal; +import backtype.storm.utils.Utils; +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.Subject; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.Map; +import java.util.HashMap; +import java.util.UUID; +import java.util.HashSet; +import java.util.Set; +import java.util.Iterator; +import java.util.Arrays; +import java.util.ArrayList; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.*; + +public class BlobStoreTest { + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreTest.class); + URI base; + File baseFile; + private static Map conf = new HashMap(); + public static final int READ = 0x01; + public static final int WRITE = 0x02; + public static final int ADMIN = 0x04; + + @Before + public void init() { + initializeConfigs(); + baseFile = new File("/tmp/blob-store-test-"+UUID.randomUUID()); + base = baseFile.toURI(); + } + + @After + public void cleanup() throws IOException { + FileUtils.deleteDirectory(baseFile); + } + + // Method which initializes nimbus admin + public static void initializeConfigs() { + conf.put(Config.NIMBUS_ADMINS,"admin"); + conf.put(Config.NIMBUS_SUPERVISOR_USERS,"supervisor"); + } + + // Gets Nimbus Subject with NimbusPrincipal set on it + public static Subject getNimbusSubject() { + Subject nimbus = new Subject(); + nimbus.getPrincipals().add(new NimbusPrincipal()); + return nimbus; + } + + // Overloading the assertStoreHasExactly method accomodate Subject in order to check for authorization + public static void assertStoreHasExactly(BlobStore store, Subject who, String ... keys) + throws IOException, KeyNotFoundException, AuthorizationException { + Set<String> expected = new HashSet<String>(Arrays.asList(keys)); + Set<String> found = new HashSet<String>(); + Iterator<String> c = store.listKeys(); + while (c.hasNext()) { + String keyName = c.next(); + found.add(keyName); + } + Set<String> extra = new HashSet<String>(found); + extra.removeAll(expected); + assertTrue("Found extra keys in the blob store "+extra, extra.isEmpty()); + Set<String> missing = new HashSet<String>(expected); + missing.removeAll(found); + assertTrue("Found keys missing from the blob store "+missing, missing.isEmpty()); + } + + public static void assertStoreHasExactly(BlobStore store, String ... keys) + throws IOException, KeyNotFoundException, AuthorizationException { + assertStoreHasExactly(store, null, keys); + } + + // Overloading the readInt method accomodate Subject in order to check for authorization (security turned on) + public static int readInt(BlobStore store, Subject who, String key) + throws IOException, KeyNotFoundException, AuthorizationException { + InputStream in = store.getBlob(key, who); + try { + return in.read(); + } finally { + in.close(); + } + } + + public static int readInt(BlobStore store, String key) + throws IOException, KeyNotFoundException, AuthorizationException { + return readInt(store, null, key); + } + + public static void readAssertEquals(BlobStore store, String key, int value) + throws IOException, KeyNotFoundException, AuthorizationException { + assertEquals(value, readInt(store, key)); + } + + // Checks for assertion when we turn on security + public void readAssertEqualsWithAuth(BlobStore store, Subject who, String key, int value) + throws IOException, KeyNotFoundException, AuthorizationException { + assertEquals(value, readInt(store, who, key)); + } + + private LocalFsBlobStore initLocalFs() { + LocalFsBlobStore store = new LocalFsBlobStore(); + // Spy object that tries to mock the real object store + LocalFsBlobStore spy = spy(store); + Mockito.doNothing().when(spy).checkForBlobUpdate("test"); + Mockito.doNothing().when(spy).checkForBlobUpdate("other"); + Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-subject-WE"); + Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-subject-DEF"); + Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-acls"); + Map conf = Utils.readStormConfig(); + conf.put(Config.STORM_LOCAL_DIR, baseFile.getAbsolutePath()); + conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN,"backtype.storm.security.auth.DefaultPrincipalToLocal"); + ArrayList<String> zookeeper_list = new ArrayList<>(); + spy.prepare(conf, null, null); + return spy; + } + + @Test + public void testLocalFsWithAuth() throws Exception { + testWithAuthentication(initLocalFs()); + } + + @Test + public void testBasicLocalFs() throws Exception { + testBasic(initLocalFs()); + } + + @Test + public void testMultipleLocalFs() throws Exception { + testMultiple(initLocalFs()); + } + + public Subject getSubject(String name) { + Subject subject = new Subject(); + SingleUserPrincipal user = new SingleUserPrincipal(name); + subject.getPrincipals().add(user); + return subject; + } + + // Check for Blobstore with authentication + public void testWithAuthentication(BlobStore store) throws Exception { + //Test for Nimbus Admin + Subject admin = getSubject("admin"); + assertStoreHasExactly(store); + SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + AtomicOutputStream out = store.createBlob("test", metadata, admin); + assertStoreHasExactly(store, "test"); + out.write(1); + out.close(); + store.deleteBlob("test", admin); + + //Test for Supervisor Admin + Subject supervisor = getSubject("supervisor"); + assertStoreHasExactly(store); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + out = store.createBlob("test", metadata, supervisor); + assertStoreHasExactly(store, "test"); + out.write(1); + out.close(); + store.deleteBlob("test", supervisor); + + //Test for Nimbus itself as a user + Subject nimbus = getNimbusSubject(); + assertStoreHasExactly(store); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + out = store.createBlob("test", metadata, nimbus); + assertStoreHasExactly(store, "test"); + out.write(1); + out.close(); + store.deleteBlob("test", nimbus); + + // Test with a dummy test_subject for cases where subject !=null (security turned on) + Subject who = getSubject("test_subject"); + assertStoreHasExactly(store); + + // Tests for case when subject != null (security turned on) and + // acls for the blob are set to WORLD_EVERYTHING + metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); + out = store.createBlob("test", metadata, who); + out.write(1); + out.close(); + assertStoreHasExactly(store, "test"); + // Testing whether acls are set to WORLD_EVERYTHING + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEqualsWithAuth(store, who, "test", 1); + + LOG.info("Deleting test"); + store.deleteBlob("test", who); + assertStoreHasExactly(store); + + // Tests for case when subject != null (security turned on) and + // acls are not set for the blob (DEFAULT) + LOG.info("Creating test again"); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + out = store.createBlob("test", metadata, who); + out.write(2); + out.close(); + assertStoreHasExactly(store, "test"); + // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because + // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have + // complete access to the blob + assertTrue("ACL does not contain WORLD_EVERYTHING", !metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEqualsWithAuth(store, who, "test", 2); + + LOG.info("Updating test"); + out = store.updateBlob("test", who); + out.write(3); + out.close(); + assertStoreHasExactly(store, "test"); + readAssertEqualsWithAuth(store, who, "test", 3); + + LOG.info("Updating test again"); + out = store.updateBlob("test", who); + out.write(4); + out.flush(); + LOG.info("SLEEPING"); + Thread.sleep(2); + assertStoreHasExactly(store, "test"); + readAssertEqualsWithAuth(store, who, "test", 3); + + // Test for subject with no principals and acls set to WORLD_EVERYTHING + who = new Subject(); + metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); + LOG.info("Creating test"); + out = store.createBlob("test-empty-subject-WE", metadata, who); + out.write(2); + out.close(); + assertStoreHasExactly(store, "test-empty-subject-WE", "test"); + // Testing whether acls are set to WORLD_EVERYTHING + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2); + + // Test for subject with no principals and acls set to DEFAULT + who = new Subject(); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + LOG.info("Creating other"); + out = store.createBlob("test-empty-subject-DEF", metadata, who); + out.write(2); + out.close(); + assertStoreHasExactly(store, "test-empty-subject-DEF", "test", "test-empty-subject-WE"); + // Testing whether acls are set to WORLD_EVERYTHING + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2); + + if (store instanceof LocalFsBlobStore) { + ((LocalFsBlobStore) store).fullCleanup(1); + } else { + fail("Error the blobstore is of unknowntype"); + } + try { + out.close(); + } catch (IOException e) { + // This is likely to happen when we try to commit something that + // was cleaned up. This is expected and acceptable. + } + } + + public void testBasic(BlobStore store) throws Exception { + assertStoreHasExactly(store); + LOG.info("Creating test"); + // Tests for case when subject == null (security turned off) and + // acls for the blob are set to WORLD_EVERYTHING + SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler + .WORLD_EVERYTHING); + AtomicOutputStream out = store.createBlob("test", metadata, null); + out.write(1); + out.close(); + assertStoreHasExactly(store, "test"); + // Testing whether acls are set to WORLD_EVERYTHING + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEquals(store, "test", 1); + + LOG.info("Deleting test"); + store.deleteBlob("test", null); + assertStoreHasExactly(store); + + // The following tests are run for both hdfs and local store to test the + // update blob interface + metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); + LOG.info("Creating test again"); + out = store.createBlob("test", metadata, null); + out.write(2); + out.close(); + assertStoreHasExactly(store, "test"); + if (store instanceof LocalFsBlobStore) { + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + } + readAssertEquals(store, "test", 2); + LOG.info("Updating test"); + out = store.updateBlob("test", null); + out.write(3); + out.close(); + assertStoreHasExactly(store, "test"); + readAssertEquals(store, "test", 3); + + LOG.info("Updating test again"); + out = store.updateBlob("test", null); + out.write(4); + out.flush(); + LOG.info("SLEEPING"); + Thread.sleep(2); + + // Tests for case when subject == null (security turned off) and + // acls for the blob are set to DEFAULT (Empty ACL List) only for LocalFsBlobstore + if (store instanceof LocalFsBlobStore) { + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + LOG.info("Creating test for empty acls when security is off"); + out = store.createBlob("test-empty-acls", metadata, null); + LOG.info("metadata {}", metadata); + out.write(2); + out.close(); + assertStoreHasExactly(store, "test-empty-acls", "test"); + // Testing whether acls are set to WORLD_EVERYTHING, Here we are testing only for LocalFsBlobstore + // as the HdfsBlobstore gets the subject information of the local system user and behaves as it is + // always authenticated. + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.get_acl().toString().contains("OTHER")); + + LOG.info("Deleting test-empty-acls"); + store.deleteBlob("test-empty-acls", null); + } + + if (store instanceof LocalFsBlobStore) { + ((LocalFsBlobStore) store).fullCleanup(1); + } else { + fail("Error the blobstore is of unknowntype"); + } + try { + out.close(); + } catch (IOException e) { + // This is likely to happen when we try to commit something that + // was cleaned up. This is expected and acceptable. + } + } + + + public void testMultiple(BlobStore store) throws Exception { + + assertStoreHasExactly(store); + LOG.info("Creating test"); + AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler + .WORLD_EVERYTHING), null); + out.write(1); + out.close(); + assertStoreHasExactly(store, "test"); + readAssertEquals(store, "test", 1); + + LOG.info("Creating other"); + out = store.createBlob("other", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), + null); + out.write(2); + out.close(); + assertStoreHasExactly(store, "test", "other"); + readAssertEquals(store, "test", 1); + readAssertEquals(store, "other", 2); + + LOG.info("Updating other"); + out = store.updateBlob("other", null); + out.write(5); + out.close(); + assertStoreHasExactly(store, "test", "other"); + readAssertEquals(store, "test", 1); + readAssertEquals(store, "other", 5); + + LOG.info("Deleting test"); + store.deleteBlob("test", null); + assertStoreHasExactly(store, "other"); + readAssertEquals(store, "other", 5); + + LOG.info("Creating test again"); + out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), + null); + out.write(2); + out.close(); + assertStoreHasExactly(store, "test", "other"); + readAssertEquals(store, "test", 2); + readAssertEquals(store, "other", 5); + + LOG.info("Updating test"); + out = store.updateBlob("test", null); + out.write(3); + out.close(); + assertStoreHasExactly(store, "test", "other"); + readAssertEquals(store, "test", 3); + readAssertEquals(store, "other", 5); + + LOG.info("Deleting other"); + store.deleteBlob("other", null); + assertStoreHasExactly(store, "test"); + readAssertEquals(store, "test", 3); + + LOG.info("Updating test again"); + out = store.updateBlob("test", null); + out.write(4); + out.flush(); + LOG.info("SLEEPING"); + Thread.sleep(2); + + if (store instanceof LocalFsBlobStore) { + ((LocalFsBlobStore) store).fullCleanup(1); + } else { + fail("Error the blobstore is of unknowntype"); + } assertStoreHasExactly(store, "test"); + readAssertEquals(store, "test", 3); + try { + out.close(); + } catch (IOException e) { + // This is likely to happen when we try to commit something that + // was cleaned up. This is expected and acceptable. + } + } + + @Test + public void testGetFileLength() + throws AuthorizationException, KeyNotFoundException, KeyAlreadyExistsException, IOException { + LocalFsBlobStore store = initLocalFs(); + AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler + .WORLD_EVERYTHING), null); + out.write(1); + out.close(); + assertEquals(1, store.getBlob("test", null).getFileLength()); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/jvm/backtype/storm/blobstore/BlobSynchronizerTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/backtype/storm/blobstore/BlobSynchronizerTest.java b/storm-core/test/jvm/backtype/storm/blobstore/BlobSynchronizerTest.java new file mode 100644 index 0000000..63f633d --- /dev/null +++ b/storm-core/test/jvm/backtype/storm/blobstore/BlobSynchronizerTest.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.blobstore; + +import backtype.storm.Config; +import backtype.storm.nimbus.NimbusInfo; +import backtype.storm.utils.Utils; +import org.apache.commons.io.FileUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingServer; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.*; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Unit tests for most of the testable utility methods + * and BlobSynchronizer class methods + */ +public class BlobSynchronizerTest { + private URI base; + private File baseFile; + private static Map conf = new HashMap(); + private NIOServerCnxnFactory factory; + + @Before + public void init() throws Exception { + initializeConfigs(); + baseFile = new File("/tmp/blob-store-test-"+ UUID.randomUUID()); + base = baseFile.toURI(); + } + + @After + public void cleanUp() throws IOException { + FileUtils.deleteDirectory(baseFile); + if (factory != null) { + factory.shutdown(); + } + } + + // Method which initializes nimbus admin + public static void initializeConfigs() { + conf.put(Config.NIMBUS_ADMINS,"admin"); + conf.put(Config.NIMBUS_SUPERVISOR_USERS,"supervisor"); + } + + private LocalFsBlobStore initLocalFs() { + LocalFsBlobStore store = new LocalFsBlobStore(); + Map conf = Utils.readStormConfig(); + conf.put(Config.STORM_LOCAL_DIR, baseFile.getAbsolutePath()); + conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN,"backtype.storm.security.auth.DefaultPrincipalToLocal"); + this.conf = conf; + store.prepare(conf, null, null); + return store; + } + + @Test + public void testBlobSynchronizerForKeysToDownload() { + BlobStore store = initLocalFs(); + BlobSynchronizer sync = new BlobSynchronizer(store, conf); + // test for keylist to download + Set<String> zkSet = new HashSet<String>(); + zkSet.add("key1"); + Set<String> blobStoreSet = new HashSet<String>(); + blobStoreSet.add("key1"); + Set<String> resultSet = sync.getKeySetToDownload(blobStoreSet, zkSet); + assertTrue("Not Empty", resultSet.isEmpty()); + zkSet.add("key1"); + blobStoreSet.add("key2"); + resultSet = sync.getKeySetToDownload(blobStoreSet, zkSet); + assertTrue("Not Empty", resultSet.isEmpty()); + blobStoreSet.remove("key1"); + blobStoreSet.remove("key2"); + zkSet.add("key1"); + resultSet = sync.getKeySetToDownload(blobStoreSet, zkSet); + assertTrue("Unexpected keys to download", (resultSet.size() == 1) && (resultSet.contains("key1"))); + } + + @Test + public void testGetLatestSequenceNumber() throws Exception { + List<String> stateInfoList = new ArrayList<String>(); + stateInfoList.add("nimbus1:8000-2"); + stateInfoList.add("nimbus-1:8000-4"); + assertTrue("Failed to get the latest version", BlobStoreUtils.getLatestSequenceNumber(stateInfoList)==4); + } + + @Test + public void testNimbodesWithLatestVersionOfBlob() throws Exception { + TestingServer server = new TestingServer(); + CuratorFramework zkClient = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); + zkClient.start(); + // Creating nimbus hosts containing latest version of blob + zkClient.create().creatingParentContainersIfNeeded().forPath("/blobstore/key1/nimbus1:7800-1"); + zkClient.create().creatingParentContainersIfNeeded().forPath("/blobstore/key1/nimbus2:7800-2"); + Set<NimbusInfo> set = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, "key1"); + assertEquals("Failed to get the correct nimbus hosts with latest blob version", (set.iterator().next()).getHost(),"nimbus2"); + zkClient.delete().deletingChildrenIfNeeded().forPath("/blobstore/key1/nimbus1:7800-1"); + zkClient.delete().deletingChildrenIfNeeded().forPath("/blobstore/key1/nimbus2:7800-2"); + zkClient.close(); + server.close(); + } + + @Test + public void testNormalizeVersionInfo () throws Exception { + BlobKeySequenceInfo info1 = BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo("nimbus1:7800-1"); + assertTrue(info1.getNimbusHostPort().equals("nimbus1:7800")); + assertTrue(info1.getSequenceNumber().equals("1")); + BlobKeySequenceInfo info2 = BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo("nimbus-1:7800-1"); + assertTrue(info2.getNimbusHostPort().equals("nimbus-1:7800")); + assertTrue(info2.getSequenceNumber().equals("1")); + } +}
