http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-client/src/py/storm/ttypes.py ---------------------------------------------------------------------- diff --git a/storm-client/src/py/storm/ttypes.py b/storm-client/src/py/storm/ttypes.py index 9e6725c..5d942b2 100644 --- a/storm-client/src/py/storm/ttypes.py +++ b/storm-client/src/py/storm/ttypes.py @@ -9452,6 +9452,8 @@ class WorkerResources: - cpu - shared_mem_on_heap - shared_mem_off_heap + - resources + - shared_resources """ thrift_spec = ( @@ -9461,14 +9463,18 @@ class WorkerResources: (3, TType.DOUBLE, 'cpu', None, None, ), # 3 (4, TType.DOUBLE, 'shared_mem_on_heap', None, None, ), # 4 (5, TType.DOUBLE, 'shared_mem_off_heap', None, None, ), # 5 + (6, TType.MAP, 'resources', (TType.STRING,None,TType.DOUBLE,None), None, ), # 6 + (7, TType.MAP, 'shared_resources', (TType.STRING,None,TType.DOUBLE,None), None, ), # 7 ) - def __init__(self, mem_on_heap=None, mem_off_heap=None, cpu=None, shared_mem_on_heap=None, shared_mem_off_heap=None,): + def __init__(self, mem_on_heap=None, mem_off_heap=None, cpu=None, shared_mem_on_heap=None, shared_mem_off_heap=None, resources=None, shared_resources=None,): self.mem_on_heap = mem_on_heap self.mem_off_heap = mem_off_heap self.cpu = cpu self.shared_mem_on_heap = shared_mem_on_heap self.shared_mem_off_heap = shared_mem_off_heap + self.resources = resources + self.shared_resources = shared_resources def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -9504,6 +9510,28 @@ class WorkerResources: self.shared_mem_off_heap = iprot.readDouble() else: iprot.skip(ftype) + elif fid == 6: + if ftype == TType.MAP: + self.resources = {} + (_ktype596, _vtype597, _size595 ) = iprot.readMapBegin() + for _i599 in xrange(_size595): + _key600 = iprot.readString().decode('utf-8') + _val601 = iprot.readDouble() + self.resources[_key600] = _val601 + iprot.readMapEnd() + else: + iprot.skip(ftype) + elif fid == 7: + if ftype == TType.MAP: + self.shared_resources = {} + (_ktype603, _vtype604, _size602 ) = iprot.readMapBegin() + for _i606 in xrange(_size602): + _key607 = iprot.readString().decode('utf-8') + _val608 = iprot.readDouble() + self.shared_resources[_key607] = _val608 + iprot.readMapEnd() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -9534,6 +9562,22 @@ class WorkerResources: oprot.writeFieldBegin('shared_mem_off_heap', TType.DOUBLE, 5) oprot.writeDouble(self.shared_mem_off_heap) oprot.writeFieldEnd() + if self.resources is not None: + oprot.writeFieldBegin('resources', TType.MAP, 6) + oprot.writeMapBegin(TType.STRING, TType.DOUBLE, len(self.resources)) + for kiter609,viter610 in self.resources.items(): + oprot.writeString(kiter609.encode('utf-8')) + oprot.writeDouble(viter610) + oprot.writeMapEnd() + oprot.writeFieldEnd() + if self.shared_resources is not None: + oprot.writeFieldBegin('shared_resources', TType.MAP, 7) + oprot.writeMapBegin(TType.STRING, TType.DOUBLE, len(self.shared_resources)) + for kiter611,viter612 in self.shared_resources.items(): + oprot.writeString(kiter611.encode('utf-8')) + oprot.writeDouble(viter612) + oprot.writeMapEnd() + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -9548,6 +9592,8 @@ class WorkerResources: value = (value * 31) ^ hash(self.cpu) value = (value * 31) ^ hash(self.shared_mem_on_heap) value = (value * 31) ^ hash(self.shared_mem_off_heap) + value = (value * 31) ^ hash(self.resources) + value = (value * 31) ^ hash(self.shared_resources) return value def __repr__(self): @@ -9630,68 +9676,68 @@ class Assignment: elif fid == 2: if ftype == TType.MAP: self.node_host = {} - (_ktype596, _vtype597, _size595 ) = iprot.readMapBegin() - for _i599 in xrange(_size595): - _key600 = iprot.readString().decode('utf-8') - _val601 = iprot.readString().decode('utf-8') - self.node_host[_key600] = _val601 + (_ktype614, _vtype615, _size613 ) = iprot.readMapBegin() + for _i617 in xrange(_size613): + _key618 = iprot.readString().decode('utf-8') + _val619 = iprot.readString().decode('utf-8') + self.node_host[_key618] = _val619 iprot.readMapEnd() else: iprot.skip(ftype) elif fid == 3: if ftype == TType.MAP: self.executor_node_port = {} - (_ktype603, _vtype604, _size602 ) = iprot.readMapBegin() - for _i606 in xrange(_size602): - _key607 = [] - (_etype612, _size609) = iprot.readListBegin() - for _i613 in xrange(_size609): - _elem614 = iprot.readI64() - _key607.append(_elem614) + (_ktype621, _vtype622, _size620 ) = iprot.readMapBegin() + for _i624 in xrange(_size620): + _key625 = [] + (_etype630, _size627) = iprot.readListBegin() + for _i631 in xrange(_size627): + _elem632 = iprot.readI64() + _key625.append(_elem632) iprot.readListEnd() - _val608 = NodeInfo() - _val608.read(iprot) - self.executor_node_port[_key607] = _val608 + _val626 = NodeInfo() + _val626.read(iprot) + self.executor_node_port[_key625] = _val626 iprot.readMapEnd() else: iprot.skip(ftype) elif fid == 4: if ftype == TType.MAP: self.executor_start_time_secs = {} - (_ktype616, _vtype617, _size615 ) = iprot.readMapBegin() - for _i619 in xrange(_size615): - _key620 = [] - (_etype625, _size622) = iprot.readListBegin() - for _i626 in xrange(_size622): - _elem627 = iprot.readI64() - _key620.append(_elem627) + (_ktype634, _vtype635, _size633 ) = iprot.readMapBegin() + for _i637 in xrange(_size633): + _key638 = [] + (_etype643, _size640) = iprot.readListBegin() + for _i644 in xrange(_size640): + _elem645 = iprot.readI64() + _key638.append(_elem645) iprot.readListEnd() - _val621 = iprot.readI64() - self.executor_start_time_secs[_key620] = _val621 + _val639 = iprot.readI64() + self.executor_start_time_secs[_key638] = _val639 iprot.readMapEnd() else: iprot.skip(ftype) elif fid == 5: if ftype == TType.MAP: self.worker_resources = {} - (_ktype629, _vtype630, _size628 ) = iprot.readMapBegin() - for _i632 in xrange(_size628): - _key633 = NodeInfo() - _key633.read(iprot) - _val634 = WorkerResources() - _val634.read(iprot) - self.worker_resources[_key633] = _val634 + (_ktype647, _vtype648, _size646 ) = iprot.readMapBegin() + for _i650 in xrange(_size646): + _key651 = NodeInfo() + _key651.read(iprot) + _val652 = WorkerResources() + _val652.read(iprot) + self.worker_resources[_key651] = _val652 iprot.readMapEnd() else: iprot.skip(ftype) elif fid == 6: if ftype == TType.MAP: self.total_shared_off_heap = {} - (_ktype636, _vtype637, _size635 ) = iprot.readMapBegin() - for _i639 in xrange(_size635): - _key640 = iprot.readString().decode('utf-8') - _val641 = iprot.readDouble() - self.total_shared_off_heap[_key640] = _val641 + (_ktype654, _vtype655, _size653 ) = iprot.readMapBegin() + for _i657 in xrange(_size653): + _key658 = iprot.readString().decode('utf-8') + _val659 = iprot.readDouble() + self.total_shared_off_heap[_key658] = _val659 iprot.readMapEnd() else: iprot.skip(ftype) @@ -9717,47 +9763,47 @@ class Assignment: if self.node_host is not None: oprot.writeFieldBegin('node_host', TType.MAP, 2) oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.node_host)) - for kiter642,viter643 in self.node_host.items(): - oprot.writeString(kiter642.encode('utf-8')) - oprot.writeString(viter643.encode('utf-8')) + for kiter660,viter661 in self.node_host.items(): + oprot.writeString(kiter660.encode('utf-8')) + oprot.writeString(viter661.encode('utf-8')) oprot.writeMapEnd() oprot.writeFieldEnd() if self.executor_node_port is not None: oprot.writeFieldBegin('executor_node_port', TType.MAP, 3) oprot.writeMapBegin(TType.LIST, TType.STRUCT, len(self.executor_node_port)) - for kiter644,viter645 in self.executor_node_port.items(): - oprot.writeListBegin(TType.I64, len(kiter644)) - for iter646 in kiter644: - oprot.writeI64(iter646) + for kiter662,viter663 in self.executor_node_port.items(): + oprot.writeListBegin(TType.I64, len(kiter662)) + for iter664 in kiter662: + oprot.writeI64(iter664) oprot.writeListEnd() - viter645.write(oprot) + viter663.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() if self.executor_start_time_secs is not None: oprot.writeFieldBegin('executor_start_time_secs', TType.MAP, 4) oprot.writeMapBegin(TType.LIST, TType.I64, len(self.executor_start_time_secs)) - for kiter647,viter648 in self.executor_start_time_secs.items(): - oprot.writeListBegin(TType.I64, len(kiter647)) - for iter649 in kiter647: - oprot.writeI64(iter649) + for kiter665,viter666 in self.executor_start_time_secs.items(): + oprot.writeListBegin(TType.I64, len(kiter665)) + for iter667 in kiter665: + oprot.writeI64(iter667) oprot.writeListEnd() - oprot.writeI64(viter648) + oprot.writeI64(viter666) oprot.writeMapEnd() oprot.writeFieldEnd() if self.worker_resources is not None: oprot.writeFieldBegin('worker_resources', TType.MAP, 5) oprot.writeMapBegin(TType.STRUCT, TType.STRUCT, len(self.worker_resources)) - for kiter650,viter651 in self.worker_resources.items(): - kiter650.write(oprot) - viter651.write(oprot) + for kiter668,viter669 in self.worker_resources.items(): + kiter668.write(oprot) + viter669.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() if self.total_shared_off_heap is not None: oprot.writeFieldBegin('total_shared_off_heap', TType.MAP, 6) oprot.writeMapBegin(TType.STRING, TType.DOUBLE, len(self.total_shared_off_heap)) - for kiter652,viter653 in self.total_shared_off_heap.items(): - oprot.writeString(kiter652.encode('utf-8')) - oprot.writeDouble(viter653) + for kiter670,viter671 in self.total_shared_off_heap.items(): + oprot.writeString(kiter670.encode('utf-8')) + oprot.writeDouble(viter671) oprot.writeMapEnd() oprot.writeFieldEnd() if self.owner is not None: @@ -9946,11 +9992,11 @@ class StormBase: elif fid == 4: if ftype == TType.MAP: self.component_executors = {} - (_ktype655, _vtype656, _size654 ) = iprot.readMapBegin() - for _i658 in xrange(_size654): - _key659 = iprot.readString().decode('utf-8') - _val660 = iprot.readI32() - self.component_executors[_key659] = _val660 + (_ktype673, _vtype674, _size672 ) = iprot.readMapBegin() + for _i676 in xrange(_size672): + _key677 = iprot.readString().decode('utf-8') + _val678 = iprot.readI32() + self.component_executors[_key677] = _val678 iprot.readMapEnd() else: iprot.skip(ftype) @@ -9978,12 +10024,12 @@ class StormBase: elif fid == 9: if ftype == TType.MAP: self.component_debug = {} - (_ktype662, _vtype663, _size661 ) = iprot.readMapBegin() - for _i665 in xrange(_size661): - _key666 = iprot.readString().decode('utf-8') - _val667 = DebugOptions() - _val667.read(iprot) - self.component_debug[_key666] = _val667 + (_ktype680, _vtype681, _size679 ) = iprot.readMapBegin() + for _i683 in xrange(_size679): + _key684 = iprot.readString().decode('utf-8') + _val685 = DebugOptions() + _val685.read(iprot) + self.component_debug[_key684] = _val685 iprot.readMapEnd() else: iprot.skip(ftype) @@ -10022,9 +10068,9 @@ class StormBase: if self.component_executors is not None: oprot.writeFieldBegin('component_executors', TType.MAP, 4) oprot.writeMapBegin(TType.STRING, TType.I32, len(self.component_executors)) - for kiter668,viter669 in self.component_executors.items(): - oprot.writeString(kiter668.encode('utf-8')) - oprot.writeI32(viter669) + for kiter686,viter687 in self.component_executors.items(): + oprot.writeString(kiter686.encode('utf-8')) + oprot.writeI32(viter687) oprot.writeMapEnd() oprot.writeFieldEnd() if self.launch_time_secs is not None: @@ -10046,9 +10092,9 @@ class StormBase: if self.component_debug is not None: oprot.writeFieldBegin('component_debug', TType.MAP, 9) oprot.writeMapBegin(TType.STRING, TType.STRUCT, len(self.component_debug)) - for kiter670,viter671 in self.component_debug.items(): - oprot.writeString(kiter670.encode('utf-8')) - viter671.write(oprot) + for kiter688,viter689 in self.component_debug.items(): + oprot.writeString(kiter688.encode('utf-8')) + viter689.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() if self.principal is not None: @@ -10138,13 +10184,13 @@ class ClusterWorkerHeartbeat: elif fid == 2: if ftype == TType.MAP: self.executor_stats = {} - (_ktype673, _vtype674, _size672 ) = iprot.readMapBegin() - for _i676 in xrange(_size672): - _key677 = ExecutorInfo() - _key677.read(iprot) - _val678 = ExecutorStats() - _val678.read(iprot) - self.executor_stats[_key677] = _val678 + (_ktype691, _vtype692, _size690 ) = iprot.readMapBegin() + for _i694 in xrange(_size690): + _key695 = ExecutorInfo() + _key695.read(iprot) + _val696 = ExecutorStats() + _val696.read(iprot) + self.executor_stats[_key695] = _val696 iprot.readMapEnd() else: iprot.skip(ftype) @@ -10175,9 +10221,9 @@ class ClusterWorkerHeartbeat: if self.executor_stats is not None: oprot.writeFieldBegin('executor_stats', TType.MAP, 2) oprot.writeMapBegin(TType.STRUCT, TType.STRUCT, len(self.executor_stats)) - for kiter679,viter680 in self.executor_stats.items(): - kiter679.write(oprot) - viter680.write(oprot) + for kiter697,viter698 in self.executor_stats.items(): + kiter697.write(oprot) + viter698.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() if self.time_secs is not None: @@ -10330,12 +10376,12 @@ class LocalStateData: if fid == 1: if ftype == TType.MAP: self.serialized_parts = {} - (_ktype682, _vtype683, _size681 ) = iprot.readMapBegin() - for _i685 in xrange(_size681): - _key686 = iprot.readString().decode('utf-8') - _val687 = ThriftSerializedObject() - _val687.read(iprot) - self.serialized_parts[_key686] = _val687 + (_ktype700, _vtype701, _size699 ) = iprot.readMapBegin() + for _i703 in xrange(_size699): + _key704 = iprot.readString().decode('utf-8') + _val705 = ThriftSerializedObject() + _val705.read(iprot) + self.serialized_parts[_key704] = _val705 iprot.readMapEnd() else: iprot.skip(ftype) @@ -10352,9 +10398,9 @@ class LocalStateData: if self.serialized_parts is not None: oprot.writeFieldBegin('serialized_parts', TType.MAP, 1) oprot.writeMapBegin(TType.STRING, TType.STRUCT, len(self.serialized_parts)) - for kiter688,viter689 in self.serialized_parts.items(): - oprot.writeString(kiter688.encode('utf-8')) - viter689.write(oprot) + for kiter706,viter707 in self.serialized_parts.items(): + oprot.writeString(kiter706.encode('utf-8')) + viter707.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -10425,11 +10471,11 @@ class LocalAssignment: elif fid == 2: if ftype == TType.LIST: self.executors = [] - (_etype693, _size690) = iprot.readListBegin() - for _i694 in xrange(_size690): - _elem695 = ExecutorInfo() - _elem695.read(iprot) - self.executors.append(_elem695) + (_etype711, _size708) = iprot.readListBegin() + for _i712 in xrange(_size708): + _elem713 = ExecutorInfo() + _elem713.read(iprot) + self.executors.append(_elem713) iprot.readListEnd() else: iprot.skip(ftype) @@ -10466,8 +10512,8 @@ class LocalAssignment: if self.executors is not None: oprot.writeFieldBegin('executors', TType.LIST, 2) oprot.writeListBegin(TType.STRUCT, len(self.executors)) - for iter696 in self.executors: - iter696.write(oprot) + for iter714 in self.executors: + iter714.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() if self.resources is not None: @@ -10606,11 +10652,11 @@ class LSApprovedWorkers: if fid == 1: if ftype == TType.MAP: self.approved_workers = {} - (_ktype698, _vtype699, _size697 ) = iprot.readMapBegin() - for _i701 in xrange(_size697): - _key702 = iprot.readString().decode('utf-8') - _val703 = iprot.readI32() - self.approved_workers[_key702] = _val703 + (_ktype716, _vtype717, _size715 ) = iprot.readMapBegin() + for _i719 in xrange(_size715): + _key720 = iprot.readString().decode('utf-8') + _val721 = iprot.readI32() + self.approved_workers[_key720] = _val721 iprot.readMapEnd() else: iprot.skip(ftype) @@ -10627,9 +10673,9 @@ class LSApprovedWorkers: if self.approved_workers is not None: oprot.writeFieldBegin('approved_workers', TType.MAP, 1) oprot.writeMapBegin(TType.STRING, TType.I32, len(self.approved_workers)) - for kiter704,viter705 in self.approved_workers.items(): - oprot.writeString(kiter704.encode('utf-8')) - oprot.writeI32(viter705) + for kiter722,viter723 in self.approved_workers.items(): + oprot.writeString(kiter722.encode('utf-8')) + oprot.writeI32(viter723) oprot.writeMapEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -10683,12 +10729,12 @@ class LSSupervisorAssignments: if fid == 1: if ftype == TType.MAP: self.assignments = {} - (_ktype707, _vtype708, _size706 ) = iprot.readMapBegin() - for _i710 in xrange(_size706): - _key711 = iprot.readI32() - _val712 = LocalAssignment() - _val712.read(iprot) - self.assignments[_key711] = _val712 + (_ktype725, _vtype726, _size724 ) = iprot.readMapBegin() + for _i728 in xrange(_size724): + _key729 = iprot.readI32() + _val730 = LocalAssignment() + _val730.read(iprot) + self.assignments[_key729] = _val730 iprot.readMapEnd() else: iprot.skip(ftype) @@ -10705,9 +10751,9 @@ class LSSupervisorAssignments: if self.assignments is not None: oprot.writeFieldBegin('assignments', TType.MAP, 1) oprot.writeMapBegin(TType.I32, TType.STRUCT, len(self.assignments)) - for kiter713,viter714 in self.assignments.items(): - oprot.writeI32(kiter713) - viter714.write(oprot) + for kiter731,viter732 in self.assignments.items(): + oprot.writeI32(kiter731) + viter732.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -10780,11 +10826,11 @@ class LSWorkerHeartbeat: elif fid == 3: if ftype == TType.LIST: self.executors = [] - (_etype718, _size715) = iprot.readListBegin() - for _i719 in xrange(_size715): - _elem720 = ExecutorInfo() - _elem720.read(iprot) - self.executors.append(_elem720) + (_etype736, _size733) = iprot.readListBegin() + for _i737 in xrange(_size733): + _elem738 = ExecutorInfo() + _elem738.read(iprot) + self.executors.append(_elem738) iprot.readListEnd() else: iprot.skip(ftype) @@ -10814,8 +10860,8 @@ class LSWorkerHeartbeat: if self.executors is not None: oprot.writeFieldBegin('executors', TType.LIST, 3) oprot.writeListBegin(TType.STRUCT, len(self.executors)) - for iter721 in self.executors: - iter721.write(oprot) + for iter739 in self.executors: + iter739.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() if self.port is not None: @@ -10901,20 +10947,20 @@ class LSTopoHistory: elif fid == 3: if ftype == TType.LIST: self.users = [] - (_etype725, _size722) = iprot.readListBegin() - for _i726 in xrange(_size722): - _elem727 = iprot.readString().decode('utf-8') - self.users.append(_elem727) + (_etype743, _size740) = iprot.readListBegin() + for _i744 in xrange(_size740): + _elem745 = iprot.readString().decode('utf-8') + self.users.append(_elem745) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 4: if ftype == TType.LIST: self.groups = [] - (_etype731, _size728) = iprot.readListBegin() - for _i732 in xrange(_size728): - _elem733 = iprot.readString().decode('utf-8') - self.groups.append(_elem733) + (_etype749, _size746) = iprot.readListBegin() + for _i750 in xrange(_size746): + _elem751 = iprot.readString().decode('utf-8') + self.groups.append(_elem751) iprot.readListEnd() else: iprot.skip(ftype) @@ -10939,15 +10985,15 @@ class LSTopoHistory: if self.users is not None: oprot.writeFieldBegin('users', TType.LIST, 3) oprot.writeListBegin(TType.STRING, len(self.users)) - for iter734 in self.users: - oprot.writeString(iter734.encode('utf-8')) + for iter752 in self.users: + oprot.writeString(iter752.encode('utf-8')) oprot.writeListEnd() oprot.writeFieldEnd() if self.groups is not None: oprot.writeFieldBegin('groups', TType.LIST, 4) oprot.writeListBegin(TType.STRING, len(self.groups)) - for iter735 in self.groups: - oprot.writeString(iter735.encode('utf-8')) + for iter753 in self.groups: + oprot.writeString(iter753.encode('utf-8')) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -11010,11 +11056,11 @@ class LSTopoHistoryList: if fid == 1: if ftype == TType.LIST: self.topo_history = [] - (_etype739, _size736) = iprot.readListBegin() - for _i740 in xrange(_size736): - _elem741 = LSTopoHistory() - _elem741.read(iprot) - self.topo_history.append(_elem741) + (_etype757, _size754) = iprot.readListBegin() + for _i758 in xrange(_size754): + _elem759 = LSTopoHistory() + _elem759.read(iprot) + self.topo_history.append(_elem759) iprot.readListEnd() else: iprot.skip(ftype) @@ -11031,8 +11077,8 @@ class LSTopoHistoryList: if self.topo_history is not None: oprot.writeFieldBegin('topo_history', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.topo_history)) - for iter742 in self.topo_history: - iter742.write(oprot) + for iter760 in self.topo_history: + iter760.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -11367,12 +11413,12 @@ class LogConfig: if fid == 2: if ftype == TType.MAP: self.named_logger_level = {} - (_ktype744, _vtype745, _size743 ) = iprot.readMapBegin() - for _i747 in xrange(_size743): - _key748 = iprot.readString().decode('utf-8') - _val749 = LogLevel() - _val749.read(iprot) - self.named_logger_level[_key748] = _val749 + (_ktype762, _vtype763, _size761 ) = iprot.readMapBegin() + for _i765 in xrange(_size761): + _key766 = iprot.readString().decode('utf-8') + _val767 = LogLevel() + _val767.read(iprot) + self.named_logger_level[_key766] = _val767 iprot.readMapEnd() else: iprot.skip(ftype) @@ -11389,9 +11435,9 @@ class LogConfig: if self.named_logger_level is not None: oprot.writeFieldBegin('named_logger_level', TType.MAP, 2) oprot.writeMapBegin(TType.STRING, TType.STRUCT, len(self.named_logger_level)) - for kiter750,viter751 in self.named_logger_level.items(): - oprot.writeString(kiter750.encode('utf-8')) - viter751.write(oprot) + for kiter768,viter769 in self.named_logger_level.items(): + oprot.writeString(kiter768.encode('utf-8')) + viter769.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -11443,10 +11489,10 @@ class TopologyHistoryInfo: if fid == 1: if ftype == TType.LIST: self.topo_ids = [] - (_etype755, _size752) = iprot.readListBegin() - for _i756 in xrange(_size752): - _elem757 = iprot.readString().decode('utf-8') - self.topo_ids.append(_elem757) + (_etype773, _size770) = iprot.readListBegin() + for _i774 in xrange(_size770): + _elem775 = iprot.readString().decode('utf-8') + self.topo_ids.append(_elem775) iprot.readListEnd() else: iprot.skip(ftype) @@ -11463,8 +11509,8 @@ class TopologyHistoryInfo: if self.topo_ids is not None: oprot.writeFieldBegin('topo_ids', TType.LIST, 1) oprot.writeListBegin(TType.STRING, len(self.topo_ids)) - for iter758 in self.topo_ids: - oprot.writeString(iter758.encode('utf-8')) + for iter776 in self.topo_ids: + oprot.writeString(iter776.encode('utf-8')) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -11778,6 +11824,335 @@ class OwnerResourceSummary: def __ne__(self, other): return not (self == other) +class WorkerMetricPoint: + """ + Attributes: + - metricName + - timestamp + - metricValue + - componentId + - executorId + - streamId + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'metricName', None, None, ), # 1 + (2, TType.I64, 'timestamp', None, None, ), # 2 + (3, TType.DOUBLE, 'metricValue', None, None, ), # 3 + (4, TType.STRING, 'componentId', None, None, ), # 4 + (5, TType.STRING, 'executorId', None, None, ), # 5 + (6, TType.STRING, 'streamId', None, None, ), # 6 + ) + + def __init__(self, metricName=None, timestamp=None, metricValue=None, componentId=None, executorId=None, streamId=None,): + self.metricName = metricName + self.timestamp = timestamp + self.metricValue = metricValue + self.componentId = componentId + self.executorId = executorId + self.streamId = streamId + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.metricName = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I64: + self.timestamp = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.DOUBLE: + self.metricValue = iprot.readDouble() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.STRING: + self.componentId = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.STRING: + self.executorId = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 6: + if ftype == TType.STRING: + self.streamId = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('WorkerMetricPoint') + if self.metricName is not None: + oprot.writeFieldBegin('metricName', TType.STRING, 1) + oprot.writeString(self.metricName.encode('utf-8')) + oprot.writeFieldEnd() + if self.timestamp is not None: + oprot.writeFieldBegin('timestamp', TType.I64, 2) + oprot.writeI64(self.timestamp) + oprot.writeFieldEnd() + if self.metricValue is not None: + oprot.writeFieldBegin('metricValue', TType.DOUBLE, 3) + oprot.writeDouble(self.metricValue) + oprot.writeFieldEnd() + if self.componentId is not None: + oprot.writeFieldBegin('componentId', TType.STRING, 4) + oprot.writeString(self.componentId.encode('utf-8')) + oprot.writeFieldEnd() + if self.executorId is not None: + oprot.writeFieldBegin('executorId', TType.STRING, 5) + oprot.writeString(self.executorId.encode('utf-8')) + oprot.writeFieldEnd() + if self.streamId is not None: + oprot.writeFieldBegin('streamId', TType.STRING, 6) + oprot.writeString(self.streamId.encode('utf-8')) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.metricName is None: + raise TProtocol.TProtocolException(message='Required field metricName is unset!') + if self.timestamp is None: + raise TProtocol.TProtocolException(message='Required field timestamp is unset!') + if self.metricValue is None: + raise TProtocol.TProtocolException(message='Required field metricValue is unset!') + if self.componentId is None: + raise TProtocol.TProtocolException(message='Required field componentId is unset!') + if self.executorId is None: + raise TProtocol.TProtocolException(message='Required field executorId is unset!') + if self.streamId is None: + raise TProtocol.TProtocolException(message='Required field streamId is unset!') + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.metricName) + value = (value * 31) ^ hash(self.timestamp) + value = (value * 31) ^ hash(self.metricValue) + value = (value * 31) ^ hash(self.componentId) + value = (value * 31) ^ hash(self.executorId) + value = (value * 31) ^ hash(self.streamId) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class WorkerMetricList: + """ + Attributes: + - metrics + """ + + thrift_spec = ( + None, # 0 + (1, TType.LIST, 'metrics', (TType.STRUCT,(WorkerMetricPoint, WorkerMetricPoint.thrift_spec)), None, ), # 1 + ) + + def __init__(self, metrics=None,): + self.metrics = metrics + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.LIST: + self.metrics = [] + (_etype780, _size777) = iprot.readListBegin() + for _i781 in xrange(_size777): + _elem782 = WorkerMetricPoint() + _elem782.read(iprot) + self.metrics.append(_elem782) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('WorkerMetricList') + if self.metrics is not None: + oprot.writeFieldBegin('metrics', TType.LIST, 1) + oprot.writeListBegin(TType.STRUCT, len(self.metrics)) + for iter783 in self.metrics: + iter783.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.metrics) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class WorkerMetrics: + """ + Attributes: + - topologyId + - port + - hostname + - metricList + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'topologyId', None, None, ), # 1 + (2, TType.I32, 'port', None, None, ), # 2 + (3, TType.STRING, 'hostname', None, None, ), # 3 + (4, TType.STRUCT, 'metricList', (WorkerMetricList, WorkerMetricList.thrift_spec), None, ), # 4 + ) + + def __init__(self, topologyId=None, port=None, hostname=None, metricList=None,): + self.topologyId = topologyId + self.port = port + self.hostname = hostname + self.metricList = metricList + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.topologyId = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I32: + self.port = iprot.readI32() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRING: + self.hostname = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.STRUCT: + self.metricList = WorkerMetricList() + self.metricList.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('WorkerMetrics') + if self.topologyId is not None: + oprot.writeFieldBegin('topologyId', TType.STRING, 1) + oprot.writeString(self.topologyId.encode('utf-8')) + oprot.writeFieldEnd() + if self.port is not None: + oprot.writeFieldBegin('port', TType.I32, 2) + oprot.writeI32(self.port) + oprot.writeFieldEnd() + if self.hostname is not None: + oprot.writeFieldBegin('hostname', TType.STRING, 3) + oprot.writeString(self.hostname.encode('utf-8')) + oprot.writeFieldEnd() + if self.metricList is not None: + oprot.writeFieldBegin('metricList', TType.STRUCT, 4) + self.metricList.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.topologyId is None: + raise TProtocol.TProtocolException(message='Required field topologyId is unset!') + if self.port is None: + raise TProtocol.TProtocolException(message='Required field port is unset!') + if self.hostname is None: + raise TProtocol.TProtocolException(message='Required field hostname is unset!') + if self.metricList is None: + raise TProtocol.TProtocolException(message='Required field metricList is unset!') + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.topologyId) + value = (value * 31) ^ hash(self.port) + value = (value * 31) ^ hash(self.hostname) + value = (value * 31) ^ hash(self.metricList) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + class DRPCRequest: """ Attributes: @@ -12049,11 +12424,11 @@ class HBRecords: if fid == 1: if ftype == TType.LIST: self.pulses = [] - (_etype762, _size759) = iprot.readListBegin() - for _i763 in xrange(_size759): - _elem764 = HBPulse() - _elem764.read(iprot) - self.pulses.append(_elem764) + (_etype787, _size784) = iprot.readListBegin() + for _i788 in xrange(_size784): + _elem789 = HBPulse() + _elem789.read(iprot) + self.pulses.append(_elem789) iprot.readListEnd() else: iprot.skip(ftype) @@ -12070,8 +12445,8 @@ class HBRecords: if self.pulses is not None: oprot.writeFieldBegin('pulses', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.pulses)) - for iter765 in self.pulses: - iter765.write(oprot) + for iter790 in self.pulses: + iter790.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -12123,10 +12498,10 @@ class HBNodes: if fid == 1: if ftype == TType.LIST: self.pulseIds = [] - (_etype769, _size766) = iprot.readListBegin() - for _i770 in xrange(_size766): - _elem771 = iprot.readString().decode('utf-8') - self.pulseIds.append(_elem771) + (_etype794, _size791) = iprot.readListBegin() + for _i795 in xrange(_size791): + _elem796 = iprot.readString().decode('utf-8') + self.pulseIds.append(_elem796) iprot.readListEnd() else: iprot.skip(ftype) @@ -12143,8 +12518,8 @@ class HBNodes: if self.pulseIds is not None: oprot.writeFieldBegin('pulseIds', TType.LIST, 1) oprot.writeListBegin(TType.STRING, len(self.pulseIds)) - for iter772 in self.pulseIds: - oprot.writeString(iter772.encode('utf-8')) + for iter797 in self.pulseIds: + oprot.writeString(iter797.encode('utf-8')) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop()
http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-client/src/storm.thrift ---------------------------------------------------------------------- diff --git a/storm-client/src/storm.thrift b/storm-client/src/storm.thrift index aff7507..c5140f3 100644 --- a/storm-client/src/storm.thrift +++ b/storm-client/src/storm.thrift @@ -672,6 +672,26 @@ struct OwnerResourceSummary { 18: optional double assigned_off_heap_memory; } +struct WorkerMetricPoint { + 1: required string metricName; + 2: required i64 timestamp; + 3: required double metricValue; + 4: required string componentId; + 5: required string executorId; + 6: required string streamId; +} + +struct WorkerMetricList { + 1: list<WorkerMetricPoint> metrics; +} + +struct WorkerMetrics { + 1: required string topologyId; + 2: required i32 port; + 3: required string hostname; + 4: required WorkerMetricList metricList; +} + service Nimbus { void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3: AuthorizationException aze); void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3: AuthorizationException aze); @@ -748,6 +768,7 @@ service Nimbus { StormTopology getUserTopology(1: string id) throws (1: NotAliveException e, 2: AuthorizationException aze); TopologyHistoryInfo getTopologyHistory(1: string user) throws (1: AuthorizationException aze); list<OwnerResourceSummary> getOwnerResourceSummaries (1: string owner) throws (1: AuthorizationException aze); + void processWorkerMetrics(1: WorkerMetrics metrics); } struct DRPCRequest { @@ -836,3 +857,5 @@ exception HBAuthorizationException { exception HBExecutionException { 1: required string msg; } + + http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/pom.xml ---------------------------------------------------------------------- diff --git a/storm-server/pom.xml b/storm-server/pom.xml index 7c301a4..cfe2d74 100644 --- a/storm-server/pom.xml +++ b/storm-server/pom.xml @@ -64,6 +64,10 @@ <artifactId>auto-service</artifactId> <optional>true</optional> </dependency> + <dependency> + <groupId>org.rocksdb</groupId> + <artifactId>rocksdbjni</artifactId> + </dependency> <!-- test --> <dependency> http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/DaemonConfig.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java index f806562..a6881b7 100644 --- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java +++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java @@ -19,10 +19,10 @@ package org.apache.storm; import static org.apache.storm.validation.ConfigValidationAnnotations.isInteger; +import static org.apache.storm.validation.ConfigValidationAnnotations.isPositiveNumber; import static org.apache.storm.validation.ConfigValidationAnnotations.isString; import static org.apache.storm.validation.ConfigValidationAnnotations.isStringList; import static org.apache.storm.validation.ConfigValidationAnnotations.isStringOrStringList; -import static org.apache.storm.validation.ConfigValidationAnnotations.isPositiveNumber; import static org.apache.storm.validation.ConfigValidationAnnotations.NotNull; import static org.apache.storm.validation.ConfigValidationAnnotations.isListEntryCustom; import static org.apache.storm.validation.ConfigValidationAnnotations.isBoolean; @@ -33,6 +33,7 @@ import static org.apache.storm.validation.ConfigValidationAnnotations.isNoDuplic import static org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryCustom; import org.apache.storm.container.ResourceIsolationInterface; +import org.apache.storm.metricstore.MetricStore; import org.apache.storm.nimbus.ITopologyActionNotifierPlugin; import org.apache.storm.scheduler.blacklist.reporters.IReporter; import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy; @@ -55,7 +56,7 @@ import java.util.Map; public class DaemonConfig implements Validated { /** - * We check with this interval that whether the Netty channel is writable and try to write pending messages + * We check with this interval that whether the Netty channel is writable and try to write pending messages. */ @isInteger public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms"; @@ -164,7 +165,7 @@ public class DaemonConfig implements Validated { /** * The time to allow any given healthcheck script to run before it - * is marked failed due to timeout + * is marked failed due to timeout. */ @isNumber public static final String STORM_HEALTH_CHECK_TIMEOUT_MS = "storm.health.check.timeout.ms"; @@ -809,7 +810,7 @@ public class DaemonConfig implements Validated { * Enables user-first classpath. See topology.classpath.beginning. */ @isBoolean - public static final String STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED="storm.topology.classpath.beginning.enabled"; + public static final String STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED = "storm.topology.classpath.beginning.enabled"; /** * This value is passed to spawned JVMs (e.g., Nimbus, Supervisor, and Workers) @@ -905,17 +906,17 @@ public class DaemonConfig implements Validated { public static final String NIMBUS_CODE_SYNC_FREQ_SECS = "nimbus.code.sync.freq.secs"; /** - * The plugin to be used for resource isolation + * The plugin to be used for resource isolation. */ @isImplementationOfClass(implementsClass = ResourceIsolationInterface.class) public static final String STORM_RESOURCE_ISOLATION_PLUGIN = "storm.resource.isolation.plugin"; /** - * CGroup Setting below + * CGroup Setting below. */ /** - * resources to to be controlled by cgroups + * resources to to be controlled by cgroups. */ @isStringList public static final String STORM_CGROUP_RESOURCES = "storm.cgroup.resources"; @@ -1030,6 +1031,48 @@ public class DaemonConfig implements Validated { public static String STORM_SUPERVISOR_MEDIUM_MEMORY_GRACE_PERIOD_MS = "storm.supervisor.medium.memory.grace.period.ms"; + /** + * Class implementing MetricStore. + */ + @NotNull + @isImplementationOfClass(implementsClass = MetricStore.class) + public static final String STORM_METRIC_STORE_CLASS = "storm.metricstore.class"; + + /** + * RocksDB file location. This setting is specific to the org.apache.storm.metricstore.rocksdb.RocksDbStore + * implementation for the storm.metricstore.class. + */ + @isString + public static final String STORM_ROCKSDB_LOCATION = "storm.metricstore.rocksdb.location"; + + /** + * RocksDB create if missing flag. This setting is specific to the org.apache.storm.metricstore.rocksdb.RocksDbStore + * implementation for the storm.metricstore.class. + */ + @isBoolean + public static final String STORM_ROCKSDB_CREATE_IF_MISSING = "storm.metricstore.rocksdb.create_if_missing"; + + /** + * RocksDB metadata cache capacity. This setting is specific to the org.apache.storm.metricstore.rocksdb.RocksDbStore + * implementation for the storm.metricstore.class. + */ + @isInteger + public static final String STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY = "storm.metricstore.rocksdb.metadata_string_cache_capacity"; + + /** + * RocksDB setting for length of metric retention. This setting is specific to the org.apache.storm.metricstore.rocksdb.RocksDbStore + * implementation for the storm.metricstore.class. + */ + @isInteger + public static final String STORM_ROCKSDB_METRIC_RETENTION_HOURS = "storm.metricstore.rocksdb.retention_hours"; + + /** + * RocksDB setting for period of metric deletion thread. This setting is specific to the + * org.apache.storm.metricstore.rocksdb.RocksDbStore implementation for the storm.metricstore.class. + */ + @isInteger + public static final String STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS = "storm.metricstore.rocksdb.deletion_period_hours"; + // VALIDATION ONLY CONFIGS // Some configs inside Config.java may reference classes we don't want to expose in storm-client, but we still want to validate // That they reference a valid class. To allow this to happen we do part of the validation on the client side with annotations on @@ -1051,7 +1094,7 @@ public class DaemonConfig implements Validated { } /** - * Get the cgroup resources from the conf + * Get the cgroup resources from the conf. * * @param conf the config to read * @return the resources. http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/LocalCluster.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java index de3053e..502f454 100644 --- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java +++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java @@ -77,6 +77,7 @@ import org.apache.storm.generated.SupervisorPageInfo; import org.apache.storm.generated.TopologyHistoryInfo; import org.apache.storm.generated.TopologyInfo; import org.apache.storm.generated.TopologyPageInfo; +import org.apache.storm.generated.WorkerMetrics; import org.apache.storm.messaging.IContext; import org.apache.storm.messaging.local.Context; import org.apache.storm.nimbus.ILeaderElector; @@ -1123,7 +1124,12 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { // TODO Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } - + + @Override + public void processWorkerMetrics(WorkerMetrics metrics) throws org.apache.thrift.TException { + getNimbus().processWorkerMetrics(metrics); + } + public static void main(final String [] args) throws Exception { if (args.length < 1) { throw new IllegalArgumentException("No class was specified to run"); http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index afb1c28..37141e8 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -124,6 +124,8 @@ import org.apache.storm.generated.TopologyInitialStatus; import org.apache.storm.generated.TopologyPageInfo; import org.apache.storm.generated.TopologyStatus; import org.apache.storm.generated.TopologySummary; +import org.apache.storm.generated.WorkerMetricPoint; +import org.apache.storm.generated.WorkerMetrics; import org.apache.storm.generated.WorkerResources; import org.apache.storm.generated.WorkerSummary; import org.apache.storm.logging.ThriftAccessLogger; @@ -132,6 +134,10 @@ import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.metric.api.DataPoint; import org.apache.storm.metric.api.IClusterMetricsConsumer; import org.apache.storm.metric.api.IClusterMetricsConsumer.ClusterInfo; +import org.apache.storm.metricstore.AggLevel; +import org.apache.storm.metricstore.Metric; +import org.apache.storm.metricstore.MetricStore; +import org.apache.storm.metricstore.MetricStoreConfig; import org.apache.storm.nimbus.DefaultTopologyValidator; import org.apache.storm.nimbus.ILeaderElector; import org.apache.storm.nimbus.ITopologyActionNotifierPlugin; @@ -230,6 +236,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private static final Meter getOwnerResourceSummariesCalls = StormMetricsRegistry.registerMeter( "nimbus:num-getOwnerResourceSummaries-calls"); private static final Meter shutdownCalls = StormMetricsRegistry.registerMeter("nimbus:num-shutdown-calls"); + private static final Meter processWorkerMetricsCalls = StormMetricsRegistry.registerMeter("nimbus:process-worker-metric-calls"); // END Metrics private static final String STORM_VERSION = VersionInfo.getVersion(); @@ -336,7 +343,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { return Nimbus.make(base.get_prev_status()); }; - private static final Map<TopologyStatus, Map<TopologyActions, TopologyStateTransition>> TOPO_STATE_TRANSITIONS = + private static final Map<TopologyStatus, Map<TopologyActions, TopologyStateTransition>> TOPO_STATE_TRANSITIONS = new ImmutableMap.Builder<TopologyStatus, Map<TopologyActions, TopologyStateTransition>>() .put(TopologyStatus.ACTIVE, new ImmutableMap.Builder<TopologyActions, TopologyStateTransition>() .put(TopologyActions.INACTIVATE, INACTIVE_TRANSITION) @@ -597,7 +604,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { /** * convert {topology-id -> SchedulerAssignment} to - * {topology-id -> {executor [node port]}} + * {topology-id -> {executor [node port]}}. * @return */ private static Map<String, Map<List<Long>, List<Object>>> computeTopoToExecToNodePort(Map<String, SchedulerAssignment> schedAssignments) { @@ -879,7 +886,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } } - private static StormTopology tryReadTopology(String topoId, TopoCache tc) throws NotAliveException, AuthorizationException, IOException { + private static StormTopology tryReadTopology(String topoId, TopoCache tc) + throws NotAliveException, AuthorizationException, IOException { try { return readStormTopologyAsNimbus(topoId, tc); } catch (KeyNotFoundException e) { @@ -1001,10 +1009,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { final Nimbus nimbus = new Nimbus(conf, inimbus); nimbus.launchServer(); final ThriftServer server = new ThriftServer(conf, new Processor<>(nimbus), ThriftConnectionType.NIMBUS); - Utils.addShutdownHookWithForceKillIn1Sec(() -> { + Utils.addShutdownHookWithDelayedForceKill(() -> { nimbus.shutdown(); server.stop(); - }); + }, 10); LOG.info("Starting nimbus server for storm version '{}'", STORM_VERSION); server.serve(); return nimbus; @@ -1022,6 +1030,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } private final Map<String, Object> conf; + private MetricStore metricsStore; private final NavigableMap<SimpleVersion, List<String>> supervisorClasspaths; private final NimbusInfo nimbusHostPortInfo; private final INimbus inimbus; @@ -1098,6 +1107,15 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper) throws Exception { this.conf = conf; + + this.metricsStore = null; + try { + this.metricsStore = MetricStoreConfig.configure(conf); + } catch (Exception e) { + // the metrics store is not critical to the operation of the cluster, allow Nimbus to come up + LOG.error("Failed to initialize metric store", e); + } + if (hostPortInfo == null) { hostPortInfo = NimbusInfo.fromConf(conf); } @@ -2730,7 +2748,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { // lock protects against multiple topologies being submitted at once and // cleanup thread killing topology in b/w assignment and starting the topology - synchronized(submitLock) { + synchronized (submitLock) { assertTopoActive(topoName, false); //cred-update-lock is not needed here because creds are being added for the first time. if (creds != null) { @@ -3790,7 +3808,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } Map<WorkerSlot, WorkerResources> workerResources = getWorkerResourcesForTopology(topoId); boolean isAllowed = userTopologies.contains(topoId); - for (WorkerSummary workerSummary: StatsUtil.aggWorkerStats(topoId, topoName, taskToComp, beats, + for (WorkerSummary workerSummary: StatsUtil.aggWorkerStats(topoId, topoName, taskToComp, beats, exec2NodePort, nodeToHost, workerResources, includeSys, isAllowed, sid)) { pageInfo.add_to_worker_summaries(workerSummary); } @@ -3808,7 +3826,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { @Override public ComponentPageInfo getComponentPageInfo(String topoId, String componentId, String window, - boolean includeSys) throws NotAliveException, AuthorizationException, TException { + boolean includeSys) throws NotAliveException, AuthorizationException, TException { try { getComponentPageInfoCalls.mark(); CommonTopoInfo info = getCommonTopoInfo(topoId, "getComponentPageInfo"); @@ -4004,8 +4022,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { List<NimbusSummary> nimbuses = stormClusterState.nimbuses(); NimbusInfo leader = leaderElector.getLeader(); for (NimbusSummary nimbusSummary: nimbuses) { - if (leader.getHost().equals(nimbusSummary.get_host()) && - leader.getPort() == nimbusSummary.get_port()) { + if (leader.getHost().equals(nimbusSummary.get_host()) + && leader.getPort() == nimbusSummary.get_port()) { nimbusSummary.set_uptime_secs(Time.deltaSecs(nimbusSummary.get_uptime_secs())); nimbusSummary.set_isLeader(true); return nimbusSummary; @@ -4042,7 +4060,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { //else, add only this owner (the input paramter) to the map Map<String, List<StormBase>> ownerToBasesMap = new HashMap<>(); - if (owner == null){ + if (owner == null) { // add all the owners to the map for (StormBase base: topoIdToBases.values()) { String baseOwner = base.get_owner(); @@ -4174,6 +4192,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } if (zkClient != null) { zkClient.close(); + } + if (metricsStore != null) { + metricsStore.close(); } LOG.info("Shut down master"); } catch (Exception e) { @@ -4187,4 +4208,27 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { public boolean isWaiting() { return timer.isTimerWaiting(); } + + @Override + public void processWorkerMetrics(WorkerMetrics metrics) throws org.apache.thrift.TException { + processWorkerMetricsCalls.mark(); + + checkAuthorization(null, null, "processWorkerMetrics"); + + if (this.metricsStore == null) { + return; + } + + for (WorkerMetricPoint m : metrics.get_metricList().get_metrics()) { + try { + Metric metric = new Metric(m.get_metricName(), m.get_timestamp(), metrics.get_topologyId(), + m.get_metricValue(), m.get_componentId(), m.get_executorId(), metrics.get_hostname(), + m.get_streamId(), metrics.get_port(), AggLevel.AGG_LEVEL_NONE); + this.metricsStore.insert(metric); + } catch (Exception e) { + LOG.error("Failed to save metric", e); + } + } + } + } http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java index ff60a4c..f45ce25 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.daemon.supervisor; import java.io.BufferedReader; @@ -39,11 +40,16 @@ import org.apache.storm.container.ResourceIsolationInterface; import org.apache.storm.generated.LSWorkerHeartbeat; import org.apache.storm.generated.LocalAssignment; import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.generated.WorkerMetricPoint; +import org.apache.storm.generated.WorkerMetricList; +import org.apache.storm.generated.WorkerMetrics; import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.NimbusClient; import org.apache.storm.utils.ServerConfigUtils; import org.apache.storm.utils.ServerUtils; -import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; @@ -53,6 +59,11 @@ import org.yaml.snakeyaml.Yaml; */ public abstract class Container implements Killable { private static final Logger LOG = LoggerFactory.getLogger(Container.class); + private static final String MEMORY_USED_METRIC = "UsedMemory"; + private static final String SYSTEM_COMPONENT_ID = "System"; + private static final String INVALID_EXECUTOR_ID = "-1"; + private static final String INVALID_STREAM_ID = "None"; + public static enum ContainerType { LAUNCH(false, false), RECOVER_FULL(true, false), @@ -137,6 +148,7 @@ public abstract class Container implements Killable { protected final ResourceIsolationInterface _resourceIsolationManager; protected ContainerType _type; protected final boolean _symlinksDisabled; + private long lastMetricProcessTime = 0L; /** * Create a new Container. @@ -209,7 +221,7 @@ public abstract class Container implements Killable { } /** - * Kill a given process + * Kill a given process. * @param pid the id of the process to kill * @throws IOException */ @@ -218,7 +230,7 @@ public abstract class Container implements Killable { } /** - * Kill a given process + * Kill a given process. * @param pid the id of the process to kill * @throws IOException */ @@ -259,7 +271,7 @@ public abstract class Container implements Killable { } /** - * Is a process alive and running? + * Is a process alive and running?. * @param pid the PID of the running process * @param user the user that is expected to own that process * @return true if it is, else false @@ -381,7 +393,7 @@ public abstract class Container implements Killable { } /** - * Write out the file used by the log viewer to allow/reject log access + * Write out the file used by the log viewer to allow/reject log access. * @param user the user this is going to run as * @throws IOException on any error */ @@ -429,7 +441,7 @@ public abstract class Container implements Killable { } /** - * Create symlink from the containers directory/artifacts to the artifacts directory + * Create symlink from the containers directory/artifacts to the artifacts directory. * @throws IOException on any error */ protected void createArtifactsLink() throws IOException { @@ -693,4 +705,41 @@ public abstract class Container implements Killable { public String getWorkerId() { return _workerId; } + + /** + * Send worker metrics to Nimbus. + */ + void processMetrics() { + try { + if (_usedMemory.get(_port) != null) { + // Make sure we don't process too frequently. + long nextMetricProcessTime = this.lastMetricProcessTime + 60L * 1000L; + long currentTimeMsec = System.currentTimeMillis(); + if (currentTimeMsec < nextMetricProcessTime) { + return; + } + + String hostname = Utils.hostname(); + + // create metric for memory + long timestamp = System.currentTimeMillis(); + double value = _usedMemory.get(_port).memory; + WorkerMetricPoint workerMetric = new WorkerMetricPoint(MEMORY_USED_METRIC, timestamp, value, SYSTEM_COMPONENT_ID, + INVALID_EXECUTOR_ID, INVALID_STREAM_ID); + + WorkerMetricList metricList = new WorkerMetricList(); + metricList.add_to_metrics(workerMetric); + WorkerMetrics metrics = new WorkerMetrics(_topologyId, _port, hostname, metricList); + + try (NimbusClient client = NimbusClient.getConfiguredClient(_conf)) { + client.getClient().processWorkerMetrics(metrics); + } + + this.lastMetricProcessTime = currentTimeMsec; + } + } catch (Exception e) { + LOG.error("Failed to process metrics", e); + this.lastMetricProcessTime = System.currentTimeMillis(); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java index cb41654..fe30c93 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java @@ -936,6 +936,9 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback } dynamicState = dynamicState.withProfileActions(mod, modPending); } + + dynamicState.container.processMetrics(); + Time.sleep(staticState.monitorFreqMs); return dynamicState; } http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java index c1e6121..28bba3e 100644 --- a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java +++ b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.metric; import com.codahale.metrics.*; @@ -53,6 +54,10 @@ public class StormMetricsRegistry { return register(name, gauge); } + public static void registerProvidedGauge(final String name, Gauge gauge) { + register(name, gauge); + } + public static Histogram registerHistogram(String name, Reservoir reservoir) { Histogram histogram = new Histogram(reservoir); return register(name, histogram); http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/AggLevel.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/AggLevel.java b/storm-server/src/main/java/org/apache/storm/metricstore/AggLevel.java new file mode 100644 index 0000000..662a17c --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/metricstore/AggLevel.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore; + +/** + * Specifies the available timeframes for Metric aggregation. + */ +public enum AggLevel { + AGG_LEVEL_NONE(0), + AGG_LEVEL_1_MIN(1), + AGG_LEVEL_10_MIN(10), + AGG_LEVEL_60_MIN(60); + + private final byte value; + + AggLevel(int value) { + this.value = (byte)value; + } + + public byte getValue() { + return this.value; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/FilterOptions.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/FilterOptions.java b/storm-server/src/main/java/org/apache/storm/metricstore/FilterOptions.java new file mode 100644 index 0000000..7cfbfbe --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/metricstore/FilterOptions.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore; + +import java.util.HashSet; +import java.util.Set; + +/** + * FilterOptions provides a method to select various filtering options for doing a scan of the metrics database. + */ +public class FilterOptions { + private Set<AggLevel> aggLevels = null; + private long startTime = 0L; + private long endTime = -1L; + private String topologyId = null; + private String componentId = null; + private String metricName = null; + private String executorId = null; + private String hostId = null; + private Integer port = null; + private String streamId = null; + + public FilterOptions() { + } + + public void setTopologyId(String topoId) { + this.topologyId = topoId; + } + + public String getTopologyId() { + return this.topologyId; + } + + public void setComponentId(String component) { + this.componentId = component; + } + + public String getComponentId() { + return this.componentId; + } + + public void setStartTime(Long time) { + this.startTime = time; + } + + public long getStartTime() { + return this.startTime; + } + + public void setEndTime(Long time) { + this.endTime = time; + } + + /** + * Returns the end time if set, returns the current time otherwise. + */ + public long getEndTime() { + if (this.endTime < 0L) { + this.endTime = System.currentTimeMillis(); + } + return this.endTime; + } + + public void setMetricName(String name) { + this.metricName = name; + } + + public String getMetricName() { + return this.metricName; + } + + public void setExecutorId(String id) { + this.executorId = id; + } + + public String getExecutorId() { + return this.executorId; + } + + public void setHostId(String id) { + this.hostId = id; + } + + public String getHostId() { + return this.hostId; + } + + public void setPort(Integer p) { + this.port = p; + } + + public Integer getPort() { + return this.port; + } + + public void setStreamId(String id) { + this.streamId = id; + } + + public String getStreamId() { + return this.streamId; + } + + /** + * Add an aggregation level to search for. + */ + public void addAggLevel(AggLevel level) { + if (this.aggLevels == null) { + this.aggLevels = new HashSet<>(1); + } + this.aggLevels.add(level); + } + + /** + * Set the aggregation levels to search for. + */ + public void setAggLevels(Set<AggLevel> levels) throws MetricException { + this.aggLevels = levels; + if (this.aggLevels == null || this.aggLevels.isEmpty()) { + throw new MetricException("Cannot search for empty AggLevel"); + } + } + + /** + * Get the aggregation levels to search for. + */ + public Set<AggLevel> getAggLevels() { + if (this.aggLevels == null) { + // assume filter choices have been made and since no selection was made, all levels are valid + this.aggLevels = new HashSet<>(4); + aggLevels.add(AggLevel.AGG_LEVEL_NONE); + aggLevels.add(AggLevel.AGG_LEVEL_1_MIN); + aggLevels.add(AggLevel.AGG_LEVEL_10_MIN); + aggLevels.add(AggLevel.AGG_LEVEL_60_MIN); + } + return this.aggLevels; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/Metric.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/Metric.java b/storm-server/src/main/java/org/apache/storm/metricstore/Metric.java new file mode 100644 index 0000000..716ced0 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/metricstore/Metric.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.TimeZone; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class containing metric values and all identifying fields to be stored in a MetricStore. + */ +public class Metric implements Comparable<Metric> { + private static final Logger LOG = LoggerFactory.getLogger(Metric.class); + + // key fields + private String name; + private long timestamp; + private String topologyId; + private String componentId; + private String executorId; + private String hostname; + private String streamId; + private int port; + private AggLevel aggLevel = AggLevel.AGG_LEVEL_NONE; + + // value fields + private double value; + private long count = 1; + private double min = 0.0; + private double max = 0.0; + private double sum = 0.0; + + + /** + * Metric constructor. + */ + public Metric(String name, Long timestamp, String topologyId, double value, String componentId, String executorId, + String hostname, String streamId, int port, AggLevel aggLevel) throws MetricException { + this.name = name; + this.timestamp = timestamp; + this.topologyId = topologyId; + this.componentId = componentId; + this.executorId = executorId; + this.hostname = hostname; + this.streamId = streamId; + this.port = port; + this.setValue(value); + setAggLevel(aggLevel); + } + + /** + * A Metric constructor with the same settings cloned from another. + */ + public Metric(Metric o) { + this.name = o.getMetricName(); + this.timestamp = o.getTimestamp(); + this.topologyId = o.getTopologyId(); + this.value = o.getValue(); + this.componentId = o.getComponentId(); + this.executorId = o.getExecutorId(); + this.hostname = o.getHostname(); + this.streamId = o.getStreamId(); + this.port = o.getPort(); + this.count = o.getCount(); + this.min = o.getMin(); + this.max = o.getMax(); + this.sum = o.getSum(); + this.aggLevel = o.getAggLevel(); + } + + /** + * Check if a Metric matches another object. + */ + public boolean equals(Object other) { + + if (!(other instanceof Metric)) { + return false; + } + + Metric o = (Metric) other; + + return this == other + || (this.name.equals(o.getMetricName()) + && this.timestamp == o.getTimestamp() + && this.topologyId.equals(o.getTopologyId()) + && this.value == o.getValue() + && this.componentId.equals(o.getComponentId()) + && this.executorId.equals(o.getExecutorId()) + && this.hostname.equals(o.getHostname()) + && this.streamId.equals(o.getStreamId()) + && this.port == o.getPort() + && this.count == o.getCount() + && this.min == o.getMin() + && this.max == o.getMax() + && this.sum == o.getSum() + && this.aggLevel == o.getAggLevel()); + } + + public AggLevel getAggLevel() { + return this.aggLevel; + } + + /** + * Set the aggLevel. + */ + public void setAggLevel(AggLevel aggLevel) throws MetricException { + if (aggLevel == null) { + throw new MetricException("AggLevel not set for metric"); + } + this.aggLevel = aggLevel; + } + + /** + * Initialize the metric value. + */ + public void setValue(double value) { + this.count = 1L; + this.min = value; + this.max = value; + this.sum = value; + this.value = value; + } + + /** + * Adds an additional value to the metric. + */ + public void addValue(double value) { + this.count += 1; + this.min = Math.min(this.min, value); + this.max = Math.max(this.max, value); + this.sum += value; + this.value = this.sum / this.count; + } + + public double getSum() { + return this.sum; + } + + public void setSum(double sum) { + this.sum = sum; + } + + public long getCount() { + return this.count; + } + + public void setCount(long count) { + this.count = count; + } + + public double getMin() { + return this.min; + } + + public void setMin(double min) { + this.min = min; + } + + public double getMax() { + return this.max; + } + + public void setMax(double max) { + this.max = max; + } + + public String getTopologyId() { + return this.topologyId; + } + + public void setTopologyId(String topologyId) { + this.topologyId = topologyId; + } + + public long getTimestamp() { + return this.timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public double getValue() { + return this.value; + } + + public String getMetricName() { + return this.name; + } + + public String getComponentId() { + return this.componentId; + } + + public String getExecutorId() { + return this.executorId; + } + + public String getHostname() { + return this.hostname; + } + + public String getStreamId() { + return this.streamId; + } + + public Integer getPort() { + return this.port; + } + + @Override + public int compareTo(Metric o) { + long a = this.getTimestamp(); + long b = o.getTimestamp(); + return Long.compare(a, b); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + Date date = new Date(this.timestamp); + DateFormat format = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SSS"); + format.setTimeZone(TimeZone.getTimeZone("UTC")); + sb.append(format.format(date)); + sb.append("|"); + sb.append(this.topologyId); + sb.append("|"); + sb.append(aggLevel); + sb.append("|"); + sb.append(this.name); + sb.append("|"); + sb.append(this.componentId); + sb.append("|"); + sb.append(this.executorId); + sb.append("|"); + sb.append(this.hostname); + sb.append("|"); + sb.append(this.port); + sb.append("|"); + sb.append(this.streamId); + return String.format("%s -- count: %d -- value: %f -- min: %f -- max: %f -- sum: %f", + sb.toString(), + this.count, + this.value, + this.min, + this.max, + this.sum); + } +} + http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/MetricException.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/MetricException.java b/storm-server/src/main/java/org/apache/storm/metricstore/MetricException.java new file mode 100644 index 0000000..e45a451 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/metricstore/MetricException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore; + +/** + * A MetricException is used to describe an error using a MetricStore. + */ +public class MetricException extends Exception { + public MetricException(String message) { + super(message); + } + + public MetricException(String message, Throwable e) { + super(message, e); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java new file mode 100644 index 0000000..166333b --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore; + +import java.util.Map; + +public interface MetricStore extends AutoCloseable { + + /** + * Create metric store instance using the configurations provided via the config map. + * + * @param config Storm config map + * @throws MetricException on preparation error + */ + void prepare(Map config) throws MetricException; + + /** + * Stores a metric in the store. + * + * @param metric Metric to store + * @throws MetricException on error + */ + void insert(Metric metric) throws MetricException; + + /** + * Fill out the numeric values for a metric. + * + * @param metric Metric to populate + * @return true if the metric was populated, false otherwise + * @throws MetricException on error + */ + boolean populateValue(Metric metric) throws MetricException; + + /** + * Close the metric store. + */ + void close(); + + /** + * Scans all metrics in the store and returns the ones matching the specified filtering options. + * + * @param filter options to filter by + * @param scanCallback callback for each Metric found + * @throws MetricException on error + */ + void scan(FilterOptions filter, ScanCallback scanCallback) throws MetricException; + + /** + * Interface used to callback metrics results from a scan. + */ + interface ScanCallback { + void cb(Metric metric); + } +} + + + +
