http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-client/src/py/storm/Nimbus.py ---------------------------------------------------------------------- diff --git a/storm-client/src/py/storm/Nimbus.py b/storm-client/src/py/storm/Nimbus.py index 1c5e86e..7313057 100644 --- a/storm-client/src/py/storm/Nimbus.py +++ b/storm-client/src/py/storm/Nimbus.py @@ -376,6 +376,13 @@ class Iface: """ pass + def getOwnerResourceSummaries(self, owner): + """ + Parameters: + - owner + """ + pass + class Client(Iface): def __init__(self, iprot, oprot=None): @@ -1927,6 +1934,39 @@ class Client(Iface): raise result.aze raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopologyHistory failed: unknown result") + def getOwnerResourceSummaries(self, owner): + """ + Parameters: + - owner + """ + self.send_getOwnerResourceSummaries(owner) + return self.recv_getOwnerResourceSummaries() + + def send_getOwnerResourceSummaries(self, owner): + self._oprot.writeMessageBegin('getOwnerResourceSummaries', TMessageType.CALL, self._seqid) + args = getOwnerResourceSummaries_args() + args.owner = owner + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_getOwnerResourceSummaries(self): + iprot = self._iprot + (fname, mtype, rseqid) = iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(iprot) + iprot.readMessageEnd() + raise x + result = getOwnerResourceSummaries_result() + result.read(iprot) + iprot.readMessageEnd() + if result.success is not None: + return result.success + if result.aze is not None: + raise result.aze + raise TApplicationException(TApplicationException.MISSING_RESULT, "getOwnerResourceSummaries failed: unknown result") + class Processor(Iface, TProcessor): def __init__(self, handler): @@ -1977,6 +2017,7 @@ class Processor(Iface, TProcessor): self._processMap["getTopology"] = Processor.process_getTopology self._processMap["getUserTopology"] = Processor.process_getUserTopology self._processMap["getTopologyHistory"] = Processor.process_getTopologyHistory + self._processMap["getOwnerResourceSummaries"] = Processor.process_getOwnerResourceSummaries def process(self, iprot, oprot): (name, type, seqid) = iprot.readMessageBegin() @@ -3052,6 +3093,28 @@ class Processor(Iface, TProcessor): oprot.writeMessageEnd() oprot.trans.flush() + def process_getOwnerResourceSummaries(self, seqid, iprot, oprot): + args = getOwnerResourceSummaries_args() + args.read(iprot) + iprot.readMessageEnd() + result = getOwnerResourceSummaries_result() + try: + result.success = self._handler.getOwnerResourceSummaries(args.owner) + msg_type = TMessageType.REPLY + except (TTransport.TTransportException, KeyboardInterrupt, SystemExit): + raise + except AuthorizationException as aze: + msg_type = TMessageType.REPLY + result.aze = aze + except Exception as ex: + msg_type = TMessageType.EXCEPTION + logging.exception(ex) + result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error') + oprot.writeMessageBegin("getOwnerResourceSummaries", msg_type, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + # HELPER FUNCTIONS AND STRUCTURES @@ -9913,3 +9976,155 @@ class getTopologyHistory_result: def __ne__(self, other): return not (self == other) + +class getOwnerResourceSummaries_args: + """ + Attributes: + - owner + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'owner', None, None, ), # 1 + ) + + def __init__(self, owner=None,): + self.owner = owner + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.owner = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('getOwnerResourceSummaries_args') + if self.owner is not None: + oprot.writeFieldBegin('owner', TType.STRING, 1) + oprot.writeString(self.owner.encode('utf-8')) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.owner) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class getOwnerResourceSummaries_result: + """ + Attributes: + - success + - aze + """ + + thrift_spec = ( + (0, TType.LIST, 'success', (TType.STRUCT,(OwnerResourceSummary, OwnerResourceSummary.thrift_spec)), None, ), # 0 + (1, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 1 + ) + + def __init__(self, success=None, aze=None,): + self.success = success + self.aze = aze + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 0: + if ftype == TType.LIST: + self.success = [] + (_etype731, _size728) = iprot.readListBegin() + for _i732 in xrange(_size728): + _elem733 = OwnerResourceSummary() + _elem733.read(iprot) + self.success.append(_elem733) + iprot.readListEnd() + else: + iprot.skip(ftype) + elif fid == 1: + if ftype == TType.STRUCT: + self.aze = AuthorizationException() + self.aze.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('getOwnerResourceSummaries_result') + if self.success is not None: + oprot.writeFieldBegin('success', TType.LIST, 0) + oprot.writeListBegin(TType.STRUCT, len(self.success)) + for iter734 in self.success: + iter734.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + if self.aze is not None: + oprot.writeFieldBegin('aze', TType.STRUCT, 1) + self.aze.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.success) + value = (value * 31) ^ hash(self.aze) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other)
http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-client/src/py/storm/ttypes.py ---------------------------------------------------------------------- diff --git a/storm-client/src/py/storm/ttypes.py b/storm-client/src/py/storm/ttypes.py index d47483f..4058f60 100644 --- a/storm-client/src/py/storm/ttypes.py +++ b/storm-client/src/py/storm/ttypes.py @@ -10998,6 +10998,294 @@ class TopologyHistoryInfo: def __ne__(self, other): return not (self == other) +class OwnerResourceSummary: + """ + Attributes: + - owner + - total_topologies + - total_executors + - total_workers + - memory_usage + - cpu_usage + - memory_guarantee + - cpu_guarantee + - memory_guarantee_remaining + - cpu_guarantee_remaining + - isolated_node_guarantee + - total_tasks + - requested_on_heap_memory + - requested_off_heap_memory + - requested_total_memory + - requested_cpu + - assigned_on_heap_memory + - assigned_off_heap_memory + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'owner', None, None, ), # 1 + (2, TType.I32, 'total_topologies', None, None, ), # 2 + (3, TType.I32, 'total_executors', None, None, ), # 3 + (4, TType.I32, 'total_workers', None, None, ), # 4 + (5, TType.DOUBLE, 'memory_usage', None, None, ), # 5 + (6, TType.DOUBLE, 'cpu_usage', None, None, ), # 6 + (7, TType.DOUBLE, 'memory_guarantee', None, None, ), # 7 + (8, TType.DOUBLE, 'cpu_guarantee', None, None, ), # 8 + (9, TType.DOUBLE, 'memory_guarantee_remaining', None, None, ), # 9 + (10, TType.DOUBLE, 'cpu_guarantee_remaining', None, None, ), # 10 + (11, TType.I32, 'isolated_node_guarantee', None, None, ), # 11 + (12, TType.I32, 'total_tasks', None, None, ), # 12 + (13, TType.DOUBLE, 'requested_on_heap_memory', None, None, ), # 13 + (14, TType.DOUBLE, 'requested_off_heap_memory', None, None, ), # 14 + (15, TType.DOUBLE, 'requested_total_memory', None, None, ), # 15 + (16, TType.DOUBLE, 'requested_cpu', None, None, ), # 16 + (17, TType.DOUBLE, 'assigned_on_heap_memory', None, None, ), # 17 + (18, TType.DOUBLE, 'assigned_off_heap_memory', None, None, ), # 18 + ) + + def __init__(self, owner=None, total_topologies=None, total_executors=None, total_workers=None, memory_usage=None, cpu_usage=None, memory_guarantee=None, cpu_guarantee=None, memory_guarantee_remaining=None, cpu_guarantee_remaining=None, isolated_node_guarantee=None, total_tasks=None, requested_on_heap_memory=None, requested_off_heap_memory=None, requested_total_memory=None, requested_cpu=None, assigned_on_heap_memory=None, assigned_off_heap_memory=None,): + self.owner = owner + self.total_topologies = total_topologies + self.total_executors = total_executors + self.total_workers = total_workers + self.memory_usage = memory_usage + self.cpu_usage = cpu_usage + self.memory_guarantee = memory_guarantee + self.cpu_guarantee = cpu_guarantee + self.memory_guarantee_remaining = memory_guarantee_remaining + self.cpu_guarantee_remaining = cpu_guarantee_remaining + self.isolated_node_guarantee = isolated_node_guarantee + self.total_tasks = total_tasks + self.requested_on_heap_memory = requested_on_heap_memory + self.requested_off_heap_memory = requested_off_heap_memory + self.requested_total_memory = requested_total_memory + self.requested_cpu = requested_cpu + self.assigned_on_heap_memory = assigned_on_heap_memory + self.assigned_off_heap_memory = assigned_off_heap_memory + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.owner = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I32: + self.total_topologies = iprot.readI32() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I32: + self.total_executors = iprot.readI32() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.I32: + self.total_workers = iprot.readI32() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.DOUBLE: + self.memory_usage = iprot.readDouble() + else: + iprot.skip(ftype) + elif fid == 6: + if ftype == TType.DOUBLE: + self.cpu_usage = iprot.readDouble() + else: + iprot.skip(ftype) + elif fid == 7: + if ftype == TType.DOUBLE: + self.memory_guarantee = iprot.readDouble() + else: + iprot.skip(ftype) + elif fid == 8: + if ftype == TType.DOUBLE: + self.cpu_guarantee = iprot.readDouble() + else: + iprot.skip(ftype) + elif fid == 9: + if ftype == TType.DOUBLE: + self.memory_guarantee_remaining = iprot.readDouble() + else: + iprot.skip(ftype) + elif fid == 10: + if ftype == TType.DOUBLE: + self.cpu_guarantee_remaining = iprot.readDouble() + else: + iprot.skip(ftype) + elif fid == 11: + if ftype == TType.I32: + self.isolated_node_guarantee = iprot.readI32() + else: + iprot.skip(ftype) + elif fid == 12: + if ftype == TType.I32: + self.total_tasks = iprot.readI32() + else: + iprot.skip(ftype) + elif fid == 13: + if ftype == TType.DOUBLE: + self.requested_on_heap_memory = iprot.readDouble() + else: + iprot.skip(ftype) + elif fid == 14: + if ftype == TType.DOUBLE: + self.requested_off_heap_memory = iprot.readDouble() + else: + iprot.skip(ftype) + elif fid == 15: + if ftype == TType.DOUBLE: + self.requested_total_memory = iprot.readDouble() + else: + iprot.skip(ftype) + elif fid == 16: + if ftype == TType.DOUBLE: + self.requested_cpu = iprot.readDouble() + else: + iprot.skip(ftype) + elif fid == 17: + if ftype == TType.DOUBLE: + self.assigned_on_heap_memory = iprot.readDouble() + else: + iprot.skip(ftype) + elif fid == 18: + if ftype == TType.DOUBLE: + self.assigned_off_heap_memory = iprot.readDouble() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('OwnerResourceSummary') + if self.owner is not None: + oprot.writeFieldBegin('owner', TType.STRING, 1) + oprot.writeString(self.owner.encode('utf-8')) + oprot.writeFieldEnd() + if self.total_topologies is not None: + oprot.writeFieldBegin('total_topologies', TType.I32, 2) + oprot.writeI32(self.total_topologies) + oprot.writeFieldEnd() + if self.total_executors is not None: + oprot.writeFieldBegin('total_executors', TType.I32, 3) + oprot.writeI32(self.total_executors) + oprot.writeFieldEnd() + if self.total_workers is not None: + oprot.writeFieldBegin('total_workers', TType.I32, 4) + oprot.writeI32(self.total_workers) + oprot.writeFieldEnd() + if self.memory_usage is not None: + oprot.writeFieldBegin('memory_usage', TType.DOUBLE, 5) + oprot.writeDouble(self.memory_usage) + oprot.writeFieldEnd() + if self.cpu_usage is not None: + oprot.writeFieldBegin('cpu_usage', TType.DOUBLE, 6) + oprot.writeDouble(self.cpu_usage) + oprot.writeFieldEnd() + if self.memory_guarantee is not None: + oprot.writeFieldBegin('memory_guarantee', TType.DOUBLE, 7) + oprot.writeDouble(self.memory_guarantee) + oprot.writeFieldEnd() + if self.cpu_guarantee is not None: + oprot.writeFieldBegin('cpu_guarantee', TType.DOUBLE, 8) + oprot.writeDouble(self.cpu_guarantee) + oprot.writeFieldEnd() + if self.memory_guarantee_remaining is not None: + oprot.writeFieldBegin('memory_guarantee_remaining', TType.DOUBLE, 9) + oprot.writeDouble(self.memory_guarantee_remaining) + oprot.writeFieldEnd() + if self.cpu_guarantee_remaining is not None: + oprot.writeFieldBegin('cpu_guarantee_remaining', TType.DOUBLE, 10) + oprot.writeDouble(self.cpu_guarantee_remaining) + oprot.writeFieldEnd() + if self.isolated_node_guarantee is not None: + oprot.writeFieldBegin('isolated_node_guarantee', TType.I32, 11) + oprot.writeI32(self.isolated_node_guarantee) + oprot.writeFieldEnd() + if self.total_tasks is not None: + oprot.writeFieldBegin('total_tasks', TType.I32, 12) + oprot.writeI32(self.total_tasks) + oprot.writeFieldEnd() + if self.requested_on_heap_memory is not None: + oprot.writeFieldBegin('requested_on_heap_memory', TType.DOUBLE, 13) + oprot.writeDouble(self.requested_on_heap_memory) + oprot.writeFieldEnd() + if self.requested_off_heap_memory is not None: + oprot.writeFieldBegin('requested_off_heap_memory', TType.DOUBLE, 14) + oprot.writeDouble(self.requested_off_heap_memory) + oprot.writeFieldEnd() + if self.requested_total_memory is not None: + oprot.writeFieldBegin('requested_total_memory', TType.DOUBLE, 15) + oprot.writeDouble(self.requested_total_memory) + oprot.writeFieldEnd() + if self.requested_cpu is not None: + oprot.writeFieldBegin('requested_cpu', TType.DOUBLE, 16) + oprot.writeDouble(self.requested_cpu) + oprot.writeFieldEnd() + if self.assigned_on_heap_memory is not None: + oprot.writeFieldBegin('assigned_on_heap_memory', TType.DOUBLE, 17) + oprot.writeDouble(self.assigned_on_heap_memory) + oprot.writeFieldEnd() + if self.assigned_off_heap_memory is not None: + oprot.writeFieldBegin('assigned_off_heap_memory', TType.DOUBLE, 18) + oprot.writeDouble(self.assigned_off_heap_memory) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.owner is None: + raise TProtocol.TProtocolException(message='Required field owner is unset!') + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.owner) + value = (value * 31) ^ hash(self.total_topologies) + value = (value * 31) ^ hash(self.total_executors) + value = (value * 31) ^ hash(self.total_workers) + value = (value * 31) ^ hash(self.memory_usage) + value = (value * 31) ^ hash(self.cpu_usage) + value = (value * 31) ^ hash(self.memory_guarantee) + value = (value * 31) ^ hash(self.cpu_guarantee) + value = (value * 31) ^ hash(self.memory_guarantee_remaining) + value = (value * 31) ^ hash(self.cpu_guarantee_remaining) + value = (value * 31) ^ hash(self.isolated_node_guarantee) + value = (value * 31) ^ hash(self.total_tasks) + value = (value * 31) ^ hash(self.requested_on_heap_memory) + value = (value * 31) ^ hash(self.requested_off_heap_memory) + value = (value * 31) ^ hash(self.requested_total_memory) + value = (value * 31) ^ hash(self.requested_cpu) + value = (value * 31) ^ hash(self.assigned_on_heap_memory) + value = (value * 31) ^ hash(self.assigned_off_heap_memory) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + class DRPCRequest: """ Attributes: http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-client/src/storm.thrift ---------------------------------------------------------------------- diff --git a/storm-client/src/storm.thrift b/storm-client/src/storm.thrift index ee02d1b..961c3cc 100644 --- a/storm-client/src/storm.thrift +++ b/storm-client/src/storm.thrift @@ -615,6 +615,27 @@ struct TopologyHistoryInfo { 1: list<string> topo_ids; } +struct OwnerResourceSummary { + 1: required string owner; + 2: optional i32 total_topologies; + 3: optional i32 total_executors; + 4: optional i32 total_workers; + 5: optional double memory_usage; + 6: optional double cpu_usage; + 7: optional double memory_guarantee; + 8: optional double cpu_guarantee; + 9: optional double memory_guarantee_remaining; + 10: optional double cpu_guarantee_remaining; + 11: optional i32 isolated_node_guarantee; + 12: optional i32 total_tasks; + 13: optional double requested_on_heap_memory; + 14: optional double requested_off_heap_memory; + 15: optional double requested_total_memory; + 16: optional double requested_cpu; + 17: optional double assigned_on_heap_memory; + 18: optional double assigned_off_heap_memory; +} + service Nimbus { void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3: AuthorizationException aze); void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3: AuthorizationException aze); @@ -690,6 +711,7 @@ service Nimbus { */ StormTopology getUserTopology(1: string id) throws (1: NotAliveException e, 2: AuthorizationException aze); TopologyHistoryInfo getTopologyHistory(1: string user) throws (1: AuthorizationException aze); + list<OwnerResourceSummary> getOwnerResourceSummaries (1: string owner) throws (1: AuthorizationException aze); } struct DRPCRequest { http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-core/src/clj/org/apache/storm/ui/core.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj index d1bdee8..7141569 100644 --- a/storm-core/src/clj/org/apache/storm/ui/core.clj +++ b/storm-core/src/clj/org/apache/storm/ui/core.clj @@ -41,7 +41,7 @@ TopologyStats CommonAggregateStats ComponentAggregateStats ComponentType BoltAggregateStats SpoutAggregateStats ExecutorAggregateStats SpecificAggregateStats ComponentPageInfo - LogConfig LogLevel LogLevelAction SupervisorPageInfo WorkerSummary]) + LogConfig LogLevel LogLevelAction SupervisorPageInfo WorkerSummary OwnerResourceSummary]) (:import [org.apache.storm.security.auth AuthUtils ReqContext]) (:import [org.apache.storm.generated AuthorizationException ProfileRequest ProfileAction NodeInfo]) (:import [org.apache.storm.security.auth AuthUtils]) @@ -81,12 +81,11 @@ (def ui:num-activate-topology-http-requests (StormMetricsRegistry/registerMeter "ui:num-activate-topology-http-requests")) (def ui:num-deactivate-topology-http-requests (StormMetricsRegistry/registerMeter "ui:num-deactivate-topology-http-requests")) (def ui:num-debug-topology-http-requests (StormMetricsRegistry/registerMeter "ui:num-debug-topology-http-requests")) -(def ui:num-component-op-response-http-requests (StormMetricsRegistry/registerMeter "ui:num-component-op-response-http-requests")) -(def ui:num-topology-op-response-http-requests (StormMetricsRegistry/registerMeter "ui:num-topology-op-response-http-requests")) -(def ui:num-topology-op-response-http-requests (StormMetricsRegistry/registerMeter "ui:num-topology-op-response-http-requests")) +(def ui:num-component-op-response-http-requests (StormMetricsRegistry/registerMeter "ui:num-component-op-response-http-requests")) (def ui:num-topology-op-response-http-requests (StormMetricsRegistry/registerMeter "ui:num-topology-op-response-http-requests")) (def ui:num-main-page-http-requests (StormMetricsRegistry/registerMeter "ui:num-main-page-http-requests")) -(def ui:num-topology-lag-http-requests (StormMetricsRegistry/registerMeter "ui:num-topology-lag-http-requests")) +(def ui:num-topology-lag-http-requests (StormMetricsRegistry/registerMeter "ui:num-topology-lag-http-requests")) +(def ui:num-get-owner-resource-summaries-http-requests (StormMetricsRegistry/registerMeter "ui:num-get-owner-resource-summaries-http-request")) (defn assert-authorized-user ([op] @@ -525,6 +524,31 @@ (supervisor-summary-to-json s)) "schedulerDisplayResource" (*STORM-CONF* SCHEDULER-DISPLAY-RESOURCE)})) +(defnk get-topologies-map [summs :conditional (fn [t] true) :keys nil] + (for [^TopologySummary t summs :when (conditional t)] + (let [data {"id" (.get_id t) + "encodedId" (URLEncoder/encode (.get_id t)) + "owner" (.get_owner t) + "name" (.get_name t) + "status" (.get_status t) + "uptime" (UIHelpers/prettyUptimeSec (.get_uptime_secs t)) + "uptimeSeconds" (.get_uptime_secs t) + "tasksTotal" (.get_num_tasks t) + "workersTotal" (.get_num_workers t) + "executorsTotal" (.get_num_executors t) + "replicationCount" (.get_replication_count t) + "schedulerInfo" (.get_sched_status t) + "requestedMemOnHeap" (.get_requested_memonheap t) + "requestedMemOffHeap" (.get_requested_memoffheap t) + "requestedTotalMem" (+ (.get_requested_memonheap t) (.get_requested_memoffheap t)) + "requestedCpu" (.get_requested_cpu t) + "assignedMemOnHeap" (.get_assigned_memonheap t) + "assignedMemOffHeap" (.get_assigned_memoffheap t) + "assignedTotalMem" (+ (.get_assigned_memonheap t) (.get_assigned_memoffheap t)) + "assignedCpu" (.get_assigned_cpu t) + "stormVersion" (.get_storm_version t)}] + (if (not-nil? keys) (select-keys data keys) data)))) + (defn all-topologies-summary ([] (thrift/with-configured-nimbus-connection @@ -532,30 +556,7 @@ (all-topologies-summary (.get_topologies (.getClusterInfo ^Nimbus$Client nimbus))))) ([summs] - {"topologies" - (for [^TopologySummary t summs] - { - "id" (.get_id t) - "encodedId" (URLEncoder/encode (.get_id t)) - "owner" (.get_owner t) - "name" (.get_name t) - "status" (.get_status t) - "uptime" (UIHelpers/prettyUptimeSec (.get_uptime_secs t)) - "uptimeSeconds" (.get_uptime_secs t) - "tasksTotal" (.get_num_tasks t) - "workersTotal" (.get_num_workers t) - "executorsTotal" (.get_num_executors t) - "replicationCount" (.get_replication_count t) - "schedulerInfo" (.get_sched_status t) - "requestedMemOnHeap" (.get_requested_memonheap t) - "requestedMemOffHeap" (.get_requested_memoffheap t) - "requestedTotalMem" (+ (.get_requested_memonheap t) (.get_requested_memoffheap t)) - "requestedCpu" (.get_requested_cpu t) - "assignedMemOnHeap" (.get_assigned_memonheap t) - "assignedMemOffHeap" (.get_assigned_memoffheap t) - "assignedTotalMem" (+ (.get_assigned_memonheap t) (.get_assigned_memoffheap t)) - "assignedCpu" (.get_assigned_cpu t) - "stormVersion" (.get_storm_version t)}) + {"topologies" (get-topologies-map summs) "schedulerDisplayResource" (*STORM-CONF* SCHEDULER-DISPLAY-RESOURCE)})) (defn topology-stats [window stats] @@ -1073,7 +1074,62 @@ "profilerActive" (if (*STORM-CONF* WORKER-PROFILER-ENABLED) (get-active-profile-actions nimbus topology-id component) []))))) - + +(defn unpack-owner-resource-summary [summary] + (let [memory-guarantee (if (.is_set_memory_guarantee summary) + (.get_memory_guarantee summary) + "N/A") + cpu-guarantee (if (.is_set_cpu_guarantee summary) + (.get_cpu_guarantee summary) + "N/A") + isolated-node-guarantee (if (.is_set_isolated_node_guarantee summary) + (.get_isolated_node_guarantee summary) + "N/A") + memory-guarantee-remaining (if (.is_set_memory_guarantee_remaining summary) + (.get_memory_guarantee_remaining summary) + "N/A") + cpu-guarantee-remaining (if (.is_set_cpu_guarantee_remaining summary) + (.get_cpu_guarantee_remaining summary) + "N/A")] + {"owner" (.get_owner summary) + "totalTopologies" (.get_total_topologies summary) + "totalExecutors" (.get_total_executors summary) + "totalWorkers" (.get_total_workers summary) + "totalTasks" (.get_total_tasks summary) + "totalMemoryUsage" (.get_memory_usage summary) + "totalCpuUsage" (.get_cpu_usage summary) + "memoryGuarantee" memory-guarantee + "cpuGuarantee" cpu-guarantee + "isolatedNodes" isolated-node-guarantee + "memoryGuaranteeRemaining" memory-guarantee-remaining + "cpuGuaranteeRemaining" cpu-guarantee-remaining + "totalReqOnHeapMem" (.get_requested_on_heap_memory summary) + "totalReqOffHeapMem" (.get_requested_off_heap_memory summary) + "totalReqMem" (.get_requested_total_memory summary) + "totalReqCpu" (.get_requested_cpu summary) + "totalAssignedOnHeapMem" (.get_assigned_on_heap_memory summary) + "totalAssignedOffHeapMem" (.get_assigned_off_heap_memory summary)})) + +(defn owner-resource-summaries [] + (thrift/with-configured-nimbus-connection nimbus + (let [summaries (.getOwnerResourceSummaries nimbus nil)] + {"schedulerDisplayResource" (*STORM-CONF* SCHEDULER-DISPLAY-RESOURCE) + "owners" + (for [summary summaries] + (unpack-owner-resource-summary summary))}))) + +(defn owner-resource-summary [owner] + (thrift/with-configured-nimbus-connection nimbus + (let [summaries (.getOwnerResourceSummaries nimbus owner)] + (merge {"schedulerDisplayResource" (*STORM-CONF* SCHEDULER-DISPLAY-RESOURCE)} + (if (empty? summaries) + ;; send a default value, we couldn't find topos by that owner + (unpack-owner-resource-summary (OwnerResourceSummary. owner)) + (let [topologies (.get_topologies (.getClusterInfo ^Nimbus$Client nimbus)) + data (get-topologies-map topologies :conditional (fn [t] (= (.get_owner t) owner)))] + (merge {"topologies" data} + (unpack-owner-resource-summary (first summaries))))))))) + (defn- level-to-dict [level] (if level (let [timeout (.get_reset_log_level_timeout_secs level) @@ -1154,6 +1210,16 @@ (populate-context! servlet-request) (assert-authorized-user "getClusterInfo") (json-response (nimbus-summary) (:callback m))) + (GET "/api/v1/owner-resources" [:as {:keys [cookies servlet-request scheme]} id & m] + (.mark ui:num-get-owner-resource-summaries-http-requests) + (populate-context! servlet-request) + (assert-authorized-user "getOwnerResourceSummaries") + (json-response (owner-resource-summaries) (:callback m))) + (GET "/api/v1/owner-resources/:id" [:as {:keys [cookies servlet-request scheme]} id & m] + (.mark ui:num-get-owner-resource-summaries-http-requests) + (populate-context! servlet-request) + (assert-authorized-user "getOwnerResourceSummaries") + (json-response (owner-resource-summary id) (:callback m))) (GET "/api/v1/history/summary" [:as {:keys [cookies servlet-request]} & m] (let [user (.getUserName http-creds-handler servlet-request)] (json-response (topology-history-info user) (:callback m)))) http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-core/src/ui/public/css/style.css ---------------------------------------------------------------------- diff --git a/storm-core/src/ui/public/css/style.css b/storm-core/src/ui/public/css/style.css index 45a3982..b8c5c61 100644 --- a/storm-core/src/ui/public/css/style.css +++ b/storm-core/src/ui/public/css/style.css @@ -236,3 +236,28 @@ div#visualization summary { display: list-item; outline: none; } + +.warning { + width: 100%; + height: 150px; + display: none; +} + +#ras-warning-fixed-buffer { + background: #FF0000; + float:left; + position:fixed; + z-index:999999 !important; + text-align:center; + color:#FFFFFF; +} + +.resource-guarantee-remaining-positive { + color: green; + font-weight: bold; +} + +.resource-guarantee-remaining-negative { + color: red; + font-weight: bold; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-core/src/ui/public/index.html ---------------------------------------------------------------------- diff --git a/storm-core/src/ui/public/index.html b/storm-core/src/ui/public/index.html index 69081d4..00693b5 100644 --- a/storm-core/src/ui/public/index.html +++ b/storm-core/src/ui/public/index.html @@ -67,6 +67,12 @@ </div> <div class="row"> <div class="col-md-12"> + <h2>Owner Summary</h2> + <div id="owner-summary"></div> + </div> + </div> + <div class="row"> + <div class="col-md-12"> <h2>Topology Summary</h2> <div id="topology-summary"></div> </div> @@ -115,6 +121,7 @@ $(document).ready(function() { var clusterSummary = $("#cluster-summary"); var clusterResources = $("#cluster-resources"); var nimbusSummary = $("#nimbus-summary"); + var ownerSummary = $("#owner-summary"); var topologySummary = $("#topology-summary"); var supervisorSummary = $("#supervisor-summary"); var config = $("#nimbus-configuration"); @@ -152,6 +159,11 @@ $(document).ready(function() { $('#nimbus-summary [data-toggle="tooltip"]').tooltip(); }); + $.getJSON("/api/v1/owner-resources", function(response, status, jqXHR) { + ownerSummary.append(Mustache.render($(indexTemplate).filter("#owner-summary-template").html(), response)); + makeOwnerSummaryTable(response, '#owner-summary-table', '#owner-summary'); + }); + $.getJSON("/api/v1/topology/summary",function(response,status,jqXHR) { topologySummary.append(Mustache.render($(indexTemplate).filter("#topology-summary-template").html(),response)); //name, owner, status, uptime, num workers, num executors, num tasks, replication count, assigned total mem, assigned total cpu, scheduler info http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-core/src/ui/public/js/script.js ---------------------------------------------------------------------- diff --git a/storm-core/src/ui/public/js/script.js b/storm-core/src/ui/public/js/script.js index a7d5deb..2ed1395 100644 --- a/storm-core/src/ui/public/js/script.js +++ b/storm-core/src/ui/public/js/script.js @@ -510,3 +510,74 @@ function jsError(other) { }); } } + +function getResourceGuaranteeRemainingFormat(type, data) { + if (type === 'display' && typeof data === "number") { + var resourceGuaranteeRemaining = parseFloat(data); + if (resourceGuaranteeRemaining > 0.0) { + return '<span class="resource-guarantee-remaining-positive">+' + data + '</span>' + } + if (resourceGuaranteeRemaining < 0.0) { + return '<span class="resource-guarantee-remaining-negative">' + data + '</span>' + } + } + return data; +} + +var makeOwnerSummaryTable = function(response, elId, parentId) { + var showCpu = response.schedulerDisplayResource; + + var columns = [ + { + data: 'owner', + render: function(data, type, row) { + return type === 'display' ? + ('<a href="/owner.html?id=' + data + '">' + data + '</a>') : + data; + } + }, { + data: 'totalTopologies', + }, { + data: 'totalExecutors', + }, { + data: 'totalWorkers', + }, { + data: 'totalMemoryUsage', + }]; + + if (showCpu) { + columns.push({ + data: 'memoryGuarantee' + }); + columns.push({ + data: 'memoryGuaranteeRemaining', + render: function(data, type, row) { + return getResourceGuaranteeRemainingFormat(type, data); + } + }); + columns.push({ + data: 'totalCpuUsage' + }); + columns.push({ + data: 'cpuGuarantee' + }); + columns.push({ + data: 'cpuGuaranteeRemaining', + render: function(data, type, row) { + return getResourceGuaranteeRemainingFormat(type, data); + } + }); + columns.push({ + data: 'isolatedNodes' + }); + } + + var userSummaryTable = dtAutoPage(elId, { + data: response.owners, + autoWidth: false, + columns: columns, + }); + + $(elId + ' [data-toggle="tooltip"]').tooltip(); +}; + http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-core/src/ui/public/owner.html ---------------------------------------------------------------------- diff --git a/storm-core/src/ui/public/owner.html b/storm-core/src/ui/public/owner.html new file mode 100644 index 0000000..6365123 --- /dev/null +++ b/storm-core/src/ui/public/owner.html @@ -0,0 +1,205 @@ +<html> + +<head> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <meta charset="UTF-8"> + <meta name="viewport" content="width=device-width, initial-scale=1"> + <title>Storm UI</title> + <link href="/css/bootstrap-3.3.1.min.css" rel="stylesheet" type="text/css"> + <link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css"> + <link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css"> + <link href="/css/jsonFormatter.min.css" rel="stylesheet" type="text/css"> + <link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css"> + <script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script> + <script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script> + <script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script> + <script src="/js/jquery.mustache.js" type="text/javascript"></script> + <script src="/js/url.min.js" type="text/javascript"></script> + <script src="/js/bootstrap-3.3.1.min.js" type="text/javascript"></script> + <script src="/js/jquery.blockUI.min.js" type="text/javascript"></script> + <script src="/js/jsonFormatter.min.js" type="text/javascript"></script> + <script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script> +</head> + +<body> + <div class="warning" id="ras-warning-fixed-buffer"> + <H1>This user's topologies are in danger of being unscheduled due to the owner's over-use of cluster resources.</H1> + <p>Please keep this user's resource consumption within guaranteed bounds to ensure topologies for this user will continue to run.</p> + </div> + <div class="warning" id="ras-warning-top-buffer"></div> + <div class="container-fluid"> + <div class="row"> + <div class="col-md-11"> + <h1><a href="/">Storm UI</a></h1> + </div> + <div id="ui-user" class="col-md-1"></div> + </div> + <div class="row"> + <div class="col-md-12" id="search-form"> + </div> + </div> + <div class="row"> + <div class="col-md-12"> + <h2>Owner Summary</h2> + <div id="owner-summary"></div> + </div> + </div> + <div class="row"> + <div class="col-md-12"> + <h2 id="owner-resource-usage-summary-header">Owner Resource Usage</h2> + <div id="owner-resource-usage-summary"></div> + </div> + </div> + <div class="row"> + <div class="col-md-12"> + <h2 id="owner-resource-guarantee-summary-header">Owner Resource Guarantees (RAS Topologies Only)</h2> + <div id="owner-resource-guarantee-summary"></div> + </div> + </div> + <div class="row"> + <div class="col-md-12"> + <h2>Owner Topologies</h2> + <div id="topology-summary"></div> + </div> + </div> + <div class="row"> + <div id="json-response-error" class="col-md-12"></div> + </div> +</div> +</body> +<script> + $(document).ajaxStop($.unblockUI); + $(document).ajaxStart(function() { + $.blockUI({ + message: '<img src="images/spinner.gif" /> <h3>Loading summary...</h3>' + }); + }); + $(document).ready(function() { + + var owner = $.url("?id"); + if (!owner) { + // this page is designed to be per owner, handle the case where the URL is malformed + getStatic("/templates/json-error-template.html", function(template) { + $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(), { + "error": "Invalid owner", + "errorMessage": "Please pass an owner id with the id URL parameter" + })); + }); + return; + } + var url = "/api/v1/owner-resources/" + owner; + + $.extend($.fn.dataTable.defaults, { + stateSave: true, + lengthMenu: [ + [20, 40, 60, 100, -1], + [20, 40, 60, 100, "All"] + ], + pageLength: 20 + }); + + $.ajaxSetup({ + "error": function(jqXHR, textStatus, response) { + var errorJson = jQuery.parseJSON(jqXHR.responseText); + getStatic("/templates/json-error-template.html", function(template) { + $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(), errorJson)); + }); + } + }); + + function jsError(other) { + try { + other(); + } catch (err) { + getStatic("/templates/json-error-template.html", function(template) { + $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(), { + error: "JS Error", + errorMessage: err + })); + }); + } + } + + var topologySummary = $("#topology-summary"); + var ownerResourceUsage = $("#owner-resource-usage-summary"); + var ownerSummary = $("#owner-summary"); + var ownerResourceGuarantee = $("#owner-resource-guarantee-summary"); + $.getJSON(url, function(response, status, jqXHR) { + getStatic("/templates/owner-page-template.html", function(template) { + jsError(function() { + //owner,totalTopologies,totalTasks,totalExecutors,totalWorkers + ownerSummary.append( + Mustache.render($(template).filter("#owner-summary-template").html(), response)); + + //totalReqOnHeapMem,totalReqOffHeapMem,totalReqMem,totalReqCpu,totalAssignedOnHeapMem,totalAssignedOffHeapMem,totalAssignedMem,totalAssignedCpu + ownerResourceUsage.append( + Mustache.render($(template).filter("#owner-resource-usage-template").html(), response)); + $('#owner-resource-usage-summary [data-toggle="tooltip"]').tooltip(); + + if (response["cpuGuarantee"] != "N/A" || response["memoryGuarantee"] != "N/A") { + ownerResourceGuarantee.append( + Mustache.render($(template).filter("#owner-resource-guarantee-template").html(), response)); + $('#owner-resource-guarantee-summary [data-toggle="tooltip"]').tooltip(); + + $('#mem-guarantee-util').html(getResourceGuaranteeRemainingFormat("display", response["memoryGuaranteeRemaining"])); + + $('#cpu-guarantee-util').html(getResourceGuaranteeRemainingFormat("display", response["cpuGuaranteeRemaining"])); + + } else { + $('#owner-resource-guarantee-summary-header').hide(); + $('#owner-resource-guarantee-summary').hide(); + } + + var displayResource = response["schedulerDisplayResource"]; + if (!displayResource) { + $('#owner-resource-usage-summary-header').hide(); + $('#owner-resource-usage-summary').hide(); + $('#owner-resource-guarantee-summary-header').hide(); + $('#owner-resource-guarantee-summary').hide(); + } + + if(response && (response["memoryGuaranteeRemaining"] < 0 || response["cpuGuaranteeRemaining"] < 0 + || response["memoryGuaranteeRemaining"] == "N/A" || response["cpuGuaranteeRemaining"] == "N/A")) { + $(".warning").show(); + } else { + $(".warning").hide(); + } + + $('#owner-resource-usage-summary [data-toggle="tooltip"]').tooltip(); + + topologySummary.append( + Mustache.render($(template).filter("#owner-topology-summary-template").html(), response)); + //name, owner, status, uptime, num workers, num executors, num tasks, assigned total mem, assigned total cpu, scheduler info + dtAutoPage("#owner-topology-summary-table", { + columnDefs: [{ + type: "num", + targets: [4, 5, 6, 7, 8] + }, { + type: "time-str", + targets: [3] + }] + }); + $('#topology-summary [data-toggle="tooltip"]').tooltip(); + + }); + }); + }); + }); +</script> + +</html> http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-core/src/ui/public/templates/index-page-template.html ---------------------------------------------------------------------- diff --git a/storm-core/src/ui/public/templates/index-page-template.html b/storm-core/src/ui/public/templates/index-page-template.html index 1802452..2c46499 100644 --- a/storm-core/src/ui/public/templates/index-page-template.html +++ b/storm-core/src/ui/public/templates/index-page-template.html @@ -152,6 +152,71 @@ </tbody> </table> </script> +<script id="owner-summary-template" type="text/html"> + <table class="table table-striped compact" id="owner-summary-table"> + <thead> + <tr> + <th> + Owner + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="Total number of topologies owned by user."> + Total Topologies + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="Total number of executors used by user."> + Total Executors + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="Total number of workers used by user."> + Total Workers + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="The amount of memory resource (in MB) used by user."> + Memory Usage (MB) + </span> + </th> + {{#schedulerDisplayResource}} + <th> + <span data-toggle="tooltip" data-placement="top" title="The amount of memory resource (in MB) guaranteed to user."> + Memory Guarantee (MB) + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="The amount of guaranteed memory resources (in MB) remaining."> + Memory Guarantee Remaining (MB) + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="Total CPU Resource Assigned on behalf of Owner. Every 100 means 1 core."> + CPU Usage (%) + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="The amount of CPU resource (every 100 means 1 core) guaranteed to user."> + CPU Guarantee (%) + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="The amount of guaranteed CPU resource (every 100 means 1 core) remaining."> + CPU Guarantee Remaining (%) + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="The amount of isolated nodes user may use"> + Isolated Nodes Guarantee + </span> + </th> + {{/schedulerDisplayResource}} + </tr> + </thead> + <tbody> + </tbody> + </table> +</script> <script id="topology-summary-template" type="text/html"> <table class="table table-striped compact" id="topology-summary-table"> <thead> @@ -224,7 +289,7 @@ {{#topologies}} <tr> <td><a href="/topology.html?id={{encodedId}}">{{name}}</a></td> - <td>{{owner}}</td> + <td><a href="/owner.html?id={{owner}}">{{owner}}</a></td> <td>{{status}}</td> <td>{{uptime}}</td> <td>{{workersTotal}}</td> http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-core/src/ui/public/templates/owner-page-template.html ---------------------------------------------------------------------- diff --git a/storm-core/src/ui/public/templates/owner-page-template.html b/storm-core/src/ui/public/templates/owner-page-template.html new file mode 100644 index 0000000..f97bf05 --- /dev/null +++ b/storm-core/src/ui/public/templates/owner-page-template.html @@ -0,0 +1,233 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<script id="owner-summary-template" type="text/html"> + <table id="owner-summary-table" class="table compact"> + <thead> + <tr> + <th> + Owner + </th> + <th> + <span data-toggle="tooltip" data-placement="above" title="Total number of topologies owned by owner."> + Total Topologies + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="above" title="Total number of tasks used by owner."> + Total Tasks + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="above" title="Total number of executors used by owner."> + Total Executors + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="above" title="Total number of workers used by owner."> + Total Workers + </span> + </th> + </tr> + </thead> + <tbody> + <tr> + <td>{{owner}}</td> + <td>{{totalTopologies}}</td> + <td>{{totalTasks}}</td> + <td>{{totalExecutors}}</td> + <td>{{totalWorkers}}</td> + </tr> + </tbody> + </table> +</script> + +<script id="owner-resource-usage-template" type="text/html"> + <table id="owner-resource-usage-table" class="table compact"> + <thead> + <tr> + <th> + </th> + <th> + On-Heap Mem (MB) + </th> + <th> + Off-Heap Mem (MB) + </th> + <th> + Total Mem (MB) + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="Total CPU Resource. Every 100 means 1 core."> + Total CPU (%) + </span> + </th> + </tr> + </thead> + <tbody> + <tr> + <td>Requested</td> + <td>{{totalReqOnHeapMem}}</td> + <td>{{totalReqOffHeapMem}}</td> + <td>{{totalReqMem}}</td> + <td>{{totalReqCpu}}</td> + </tr> + <tr> + <td>Assigned</td> + <td>{{totalAssignedOnHeapMem}}</td> + <td>{{totalAssignedOffHeapMem}}</td> + <td>{{totalMemoryUsage}}</td> + <td>{{totalCpuUsage}}</td> + </tr> + </tbody> + </table> +</script> + +<script id="owner-resource-guarantee-template" type="text/html"> + <table id="owner-resource-guarantee-table" class="table compact"> + <thead> + <tr> + <th> + <span data-toggle="tooltip" data-placement="top" title="The amount of CPU resource (every 100 means 1 core) guaranteed to owner."> + CPU Guarantee (%) + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="The amount of CPU resource (every 100 means 1 core) used by owner."> + CPU Usage (%) + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="The amount of guaranteed CPU resource (every 100 means 1 core) remaining."> + CPU Guarantee Remaining (%) + </span> + </th> + </tr> + </thead> + <tbody> + <tr> + <td>{{cpuGuarantee}}</td> + <td>{{totalCpuUsage}}</td> + <td id="cpu-guarantee-util">{{cpuGuaranteeRemaining}}</td> + </tr> + </tbody> + <thead> + <tr> + <th> + <span data-toggle="tooltip" data-placement="top" title="The amount of memory resource (in MB) guaranteed to owner."> + Memory Guarantee (MB) + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="The amount of memory resource (in MB) used by owner."> + Memory Usage (MB) + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="The amount of guaranteed memory resources (in MB) remaining."> + Memory Guarantee Remaining (MB) + </span> + </th> + </tr> + </thead> + <tbody> + <tr> + <td>{{memoryGuarantee}}</td> + <td>{{totalMemoryUsage}}</td> + <td id="mem-guarantee-util">{{memoryGuaranteeRemaining}}</td> + </tr> + </tbody> + </table> +</script> + +<script id="owner-topology-summary-template" type="text/html"> + <table id="owner-topology-summary-table" class="table table-striped compact"> + <thead> + <tr> + <th> + <span data-toggle="tooltip" data-placement="top" title="The name given to the topology by when it was submitted. Click the name to view the Topology's information."> + Name + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="The owner that submitted the Topology, if authentication is enabled."> + Owner + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="The status can be one of ACTIVE, INACTIVE, KILLED, or REBALANCING."> + Status + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="The time since the Topology was submitted."> + Uptime + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="The number of Workers (processes)."> + Num workers + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="Executors are threads in a Worker process."> + Num executors + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="A Task is an instance of a Bolt or Spout. The number of Tasks is almost always equal to the number of Executors."> + Num tasks + </span> + </th> + <th> + <span data-toggle="tooltip" data-placement="top" title="Assigned Total Memory by Scheduler."> + Assigned Mem (MB) + </span> + </th> + {{#schedulerDisplayResource}} + <th> + <span data-toggle="tooltip" data-placement="top" title="Assigned Total CPU by Scheduler. Every 100 means 1 core."> + Assigned CPU (%) + </span> + </th> + {{/schedulerDisplayResource}} + <th> + <span data-toggle="tooltip" data-placement="left" title="This shows information from the scheduler about the latest attempt to schedule the Topology on the cluster."> + Scheduler Info + </span> + </th> + </tr> + </thead> + <tbody> + {{#topologies}} + <tr> + <td><a href="/topology.html?id={{encodedId}}">{{name}}</a></td> + <td><a href="/owner.html?id={{owner}}">{{owner}}</a></td> + <td>{{status}}</td> + <td>{{uptime}}</td> + <td>{{workersTotal}}</td> + <td>{{executorsTotal}}</td> + <td>{{tasksTotal}}</td> + <td>{{assignedTotalMem}}</td> + {{#schedulerDisplayResource}} + <td>{{assignedCpu}}</td> + {{/schedulerDisplayResource}} + <td>{{schedulerInfo}}</td> + </tr> + {{/topologies}} + </tbody> + </table> +</script> http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-core/src/ui/public/templates/topology-page-template.html ---------------------------------------------------------------------- diff --git a/storm-core/src/ui/public/templates/topology-page-template.html b/storm-core/src/ui/public/templates/topology-page-template.html index 16a4f97..a2e6201 100644 --- a/storm-core/src/ui/public/templates/topology-page-template.html +++ b/storm-core/src/ui/public/templates/topology-page-template.html @@ -91,7 +91,7 @@ <tr> <td>{{name}}</td> <td>{{id}}</td> - <td>{{owner}}</td> + <td><a href="/owner.html?id={{owner}}">{{owner}}</a></td> <td>{{status}}</td> <td>{{uptime}}</td> <td>{{workersTotal}}</td> http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-core/test/clj/org/apache/storm/nimbus_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj index bfa8bd6..c01babb 100644 --- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj +++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj @@ -107,7 +107,7 @@ .get_executor_node_port .values (map (fn [np] (.get_node np))) - set + set ))) (defn topology-slots [state storm-name] @@ -116,7 +116,7 @@ (->> assignment .get_executor_node_port .values - set + set ))) ;TODO: when translating this function, don't call map-val, but instead use an inline for loop. @@ -131,7 +131,7 @@ (group-by (fn [np] (.get_node np))) (map-val count) (map (fn [[_ amt]] {amt 1})) - (apply merge-with +) + (apply merge-with +) ))) (defn topology-num-nodes [state storm-name] @@ -213,7 +213,7 @@ (is (not-nil? (.get task->node+port t))))) (doseq [[e s] executor->node+port] (is (not-nil? s))) - + (is (= all-nodes (set (keys (.get_node_host assignment))))) (doseq [[e s] executor->node+port] (is (not-nil? (.get (.get_executor_start_time_secs assignment) e)))) @@ -221,7 +221,7 @@ (deftest test-bogusId (with-open [cluster (.build (doto (LocalCluster$Builder. ) - (.withSupervisors 4) + (.withSupervisors 4) (.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))] (let [state (.getClusterState cluster) nimbus (.getNimbus cluster)] @@ -235,7 +235,7 @@ (deftest test-assignment (with-open [cluster (.build (doto (LocalCluster$Builder. ) (.withSimulatedTime) - (.withSupervisors 4) + (.withSupervisors 4) (.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))] (let [state (.getClusterState cluster) topology (Thrift/buildTopology @@ -952,6 +952,70 @@ (is (not= (executor->start t) (executor->start2 t)))) ))) +(deftest test-get-owner-resource-summaries + (with-open [cluster (.build (doto (LocalCluster$Builder. ) + (.withSimulatedTime) + (.withSupervisors 1) + (.withPortsPerSupervisor 12) + (.withDaemonConf + {SUPERVISOR-ENABLE false + NIMBUS-MONITOR-FREQ-SECS 10 + TOPOLOGY-MESSAGE-TIMEOUT-SECS 30 + TOPOLOGY-ACKER-EXECUTORS 0 + TOPOLOGY-EVENTLOGGER-EXECUTORS 0 + })))] + (letlocals + ;test for 0-topology case + (.advanceClusterTime cluster 11) + (bind owner-resource-summaries (.getOwnerResourceSummaries (.getNimbus cluster) nil)) + (bind summary (first owner-resource-summaries)) + (is (nil? summary)) + + ;test for 1-topology case + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 3))} + {})) + (.submitTopology cluster + "test" + {TOPOLOGY-WORKERS 3 + TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology) + (.advanceClusterTime cluster 11) + + (bind owner-resource-summaries (.getOwnerResourceSummaries (.getNimbus cluster) nil)) + (bind summary (first owner-resource-summaries)) + (is (= (.get_total_workers summary) 3)) + (is (= (.get_total_executors summary)) 3) + (is (= (.get_total_topologies summary)) 1) + + ;test for many-topology case + (bind topology2 (Thrift/buildTopology + {"2" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 4))} + {})) + (bind topology3 (Thrift/buildTopology + {"3" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 5))} + {})) + + (.submitTopology cluster + "test2" + {TOPOLOGY-WORKERS 4 + TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology2) + + (.submitTopology cluster + "test3" + {TOPOLOGY-WORKERS 3 + TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology3) + (.advanceClusterTime cluster 11) + + (bind owner-resource-summaries (.getOwnerResourceSummaries (.getNimbus cluster) nil)) + (bind summary (first owner-resource-summaries)) + (is (= (.get_total_workers summary) 10)) + (is (= (.get_total_executors summary)) 12) + (is (= (.get_total_topologies summary)) 3) + ))) + (deftest test-rebalance (with-open [cluster (.build (doto (LocalCluster$Builder. ) (.withSimulatedTime) @@ -1073,9 +1137,9 @@ node->ports (apply merge-with (fn [a b] (distinct (concat a b))) (for [np node+ports] {(.get_node np) [(first (.get_port np))]}))]] {id node->ports})) _ (log-message "id->node->ports: " id->node->ports) - all-nodes (apply merge-with (fn [a b] + all-nodes (apply merge-with (fn [a b] (let [ret (concat a b)] - (log-message "Can we combine " (pr-str a) " and " (pr-str b) " without collisions? " (apply distinct? ret) " => " (pr-str ret)) + (log-message "Can we combine " (pr-str a) " and " (pr-str b) " without collisions? " (apply distinct? ret) " => " (pr-str ret)) (is (apply distinct? ret)) (distinct ret))) (.values id->node->ports))] @@ -1252,7 +1316,7 @@ (.submitTopology nimbus "t1" nil "{}" topology) ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. (.rebalance nimbus "t1" (doto (RebalanceOptions.) (.set_wait_secs 0))) - (wait-for-status nimbus "t1" "ACTIVE") + (wait-for-status nimbus "t1" "ACTIVE") (.deactivate nimbus "t1") (.activate nimbus "t1") (.rebalance nimbus "t1" (RebalanceOptions.)) @@ -1302,7 +1366,7 @@ (deftest test-nimbus-iface-methods-check-authorization (let [cluster-state (Mockito/mock IStormClusterState) blob-store (Mockito/mock BlobStore)] - (with-open [cluster (.build + (with-open [cluster (.build (doto (LocalCluster$Builder. ) (.withClusterState cluster-state) (.withBlobStore blob-store) @@ -1321,7 +1385,7 @@ (deftest test-nimbus-check-authorization-params (let [cluster-state (Mockito/mock IStormClusterState) blob-store (Mockito/mock BlobStore)] - (with-open [cluster (.build + (with-open [cluster (.build (doto (LocalCluster$Builder. ) (.withClusterState cluster-state) (.withBlobStore blob-store) @@ -1333,7 +1397,7 @@ topology (Thrift/buildTopology {} {}) expected-name topology-name expected-conf {TOPOLOGY-NAME expected-name - "foo" "bar"}] + "foo" "bar"}] (.thenReturn (Mockito/when (.getTopoId cluster-state topology-name)) (Optional/of topology-id)) (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) expected-conf) (.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any String) (Mockito/anyObject))) nil) @@ -1342,7 +1406,7 @@ (try (is (= expected-conf (->> (.getTopologyConf nimbus topology-id) - JSONValue/parse + JSONValue/parse clojurify-structure))) (catch NotAliveException e) (finally @@ -1378,7 +1442,7 @@ (deftest test-check-authorization-getSupervisorPageInfo (let [cluster-state (Mockito/mock IStormClusterState) blob-store (Mockito/mock BlobStore)] - (with-open [cluster (.build + (with-open [cluster (.build (doto (LocalCluster$Builder. ) (.withClusterState cluster-state) (.withBlobStore blob-store) @@ -1400,7 +1464,7 @@ (.set_state_spouts {})) topo-assignment {expected-name assignment} check-auth-state (atom []) - mock-check-authorization (fn [nimbus storm-name storm-conf operation] + mock-check-authorization (fn [nimbus storm-name storm-conf operation] (swap! check-auth-state conj {:nimbus nimbus :storm-name storm-name :storm-conf storm-conf @@ -1415,7 +1479,7 @@ (.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any String) (Mockito/any Subject))) topology) (.thenReturn (Mockito/when (.topologyAssignments cluster-state)) topo-assignment) (.getSupervisorPageInfo nimbus "super1" nil true) - + ;; afterwards, it should get called twice (.checkAuthorization (Mockito/verify nimbus) (Mockito/eq expected-name) (Mockito/any Map) (Mockito/eq "getSupervisorPageInfo")) (.checkAuthorization (Mockito/verify nimbus) nil nil "getClusterInfo") @@ -1468,7 +1532,7 @@ (deftest test-nimbus-iface-getClusterInfo-filters-topos-without-bases (let [cluster-state (Mockito/mock IStormClusterState) blob-store (Mockito/mock BlobStore)] - (with-open [cluster (.build + (with-open [cluster (.build (doto (LocalCluster$Builder. ) (.withClusterState cluster-state) (.withBlobStore blob-store)))] @@ -1495,7 +1559,7 @@ (.thenReturn (Mockito/when (.topologyBases cluster-state)) bogus-bases) (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/any Subject))) topo-conf) (.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any String) (Mockito/any Subject))) topology) - + (let [topos (.get_topologies (.getClusterInfo nimbus))] ; The number of topologies in the summary is correct. (is (= (count @@ -1549,7 +1613,7 @@ )))) (deftest test-file-bogus-download - (with-open [cluster (.build + (with-open [cluster (.build (doto (LocalCluster$Builder. ) (.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))] (let [nimbus (.getNimbus cluster)] @@ -1561,7 +1625,7 @@ (deftest test-validate-topo-config-on-submit (let [cluster-state (Mockito/mock IStormClusterState) blob-store (Mockito/mock BlobStore)] - (with-open [cluster (.build + (with-open [cluster (.build (doto (LocalCluster$Builder. ) (.withClusterState cluster-state) (.withBlobStore blob-store) @@ -1674,7 +1738,7 @@ (deftest empty-save-config-results-in-all-unchanged-actions (let [cluster-state (Mockito/mock IStormClusterState) blob-store (Mockito/mock BlobStore)] - (with-open [cluster (.build + (with-open [cluster (.build (doto (LocalCluster$Builder. ) (.withClusterState cluster-state) (.withBlobStore blob-store) @@ -1684,26 +1748,26 @@ mock-config (LogConfig.) expected-config (LogConfig.)] ;; send something with content to nimbus beforehand - (.put_to_named_logger_level previous-config "test" + (.put_to_named_logger_level previous-config "test" (doto (LogLevel.) (.set_target_log_level "ERROR") (.set_action LogLevelAction/UPDATE))) - (.put_to_named_logger_level expected-config "test" + (.put_to_named_logger_level expected-config "test" (doto (LogLevel.) (.set_target_log_level "ERROR") (.set_action LogLevelAction/UNCHANGED))) (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) {}) (.thenReturn (Mockito/when (.topologyLogConfig cluster-state (Mockito/any String) (Mockito/anyObject))) previous-config) - + (.setLogConfig nimbus "foo" mock-config) (.setTopologyLogConfig (Mockito/verify cluster-state) (Mockito/any String) (Mockito/eq expected-config)))))) (deftest log-level-update-merges-and-flags-existent-log-level (let [cluster-state (Mockito/mock IStormClusterState) blob-store (Mockito/mock BlobStore)] - (with-open [cluster (.build + (with-open [cluster (.build (doto (LocalCluster$Builder. ) (.withClusterState cluster-state) (.withBlobStore blob-store) @@ -1713,36 +1777,36 @@ mock-config (LogConfig.) expected-config (LogConfig.)] ;; send something with content to nimbus beforehand - (.put_to_named_logger_level previous-config "test" + (.put_to_named_logger_level previous-config "test" (doto (LogLevel.) (.set_target_log_level "ERROR") (.set_action LogLevelAction/UPDATE))) - (.put_to_named_logger_level previous-config "other-test" + (.put_to_named_logger_level previous-config "other-test" (doto (LogLevel.) (.set_target_log_level "DEBUG") (.set_action LogLevelAction/UPDATE))) ;; only change "test" - (.put_to_named_logger_level mock-config "test" + (.put_to_named_logger_level mock-config "test" (doto (LogLevel.) (.set_target_log_level "INFO") (.set_action LogLevelAction/UPDATE))) - (.put_to_named_logger_level expected-config "test" + (.put_to_named_logger_level expected-config "test" (doto (LogLevel.) (.set_target_log_level "INFO") (.set_action LogLevelAction/UPDATE))) - (.put_to_named_logger_level expected-config "other-test" + (.put_to_named_logger_level expected-config "other-test" (doto (LogLevel.) (.set_target_log_level "DEBUG") (.set_action LogLevelAction/UNCHANGED))) (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) {}) (.thenReturn (Mockito/when (.topologyLogConfig cluster-state (Mockito/any String) (Mockito/anyObject))) previous-config) - + (.setLogConfig nimbus "foo" mock-config) (.setTopologyLogConfig (Mockito/verify cluster-state) (Mockito/any String) (Mockito/eq expected-config)))))) @@ -1750,11 +1814,11 @@ (defn teardown-topo-errors [id]) (defn teardown-backpressure-dirs [id]) -(defn mock-cluster-state - ([] +(defn mock-cluster-state + ([] (mock-cluster-state nil nil)) ([active-topos inactive-topos] - (mock-cluster-state active-topos inactive-topos inactive-topos inactive-topos)) + (mock-cluster-state active-topos inactive-topos inactive-topos inactive-topos)) ([active-topos hb-topos error-topos bp-topos] (reify IStormClusterState (teardownHeartbeats [this id] (teardown-heartbeats id)) @@ -1790,7 +1854,7 @@ mock-state (mock-cluster-state active-topos hb-topos error-topos bp-topos) store (Mockito/mock BlobStore)] (.thenReturn (Mockito/when (.storedTopoIds store)) #{}) - (is (= (Nimbus/topoIdsToClean mock-state store) + (is (= (Nimbus/topoIdsToClean mock-state store) #{})))) (deftest do-cleanup-removes-inactive-znodes @@ -1805,8 +1869,8 @@ (.set (.getHeartbeatsCache nimbus) hb-cache) (.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (HashSet. inactive-topos)) (mocking - [teardown-heartbeats - teardown-topo-errors + [teardown-heartbeats + teardown-topo-errors teardown-backpressure-dirs] (.doCleanup nimbus) @@ -1850,8 +1914,8 @@ (.set (.getHeartbeatsCache nimbus) hb-cache) (.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (set inactive-topos)) (mocking - [teardown-heartbeats - teardown-topo-errors + [teardown-heartbeats + teardown-topo-errors teardown-backpressure-dirs] (.doCleanup nimbus) @@ -1883,7 +1947,7 @@ supervisor2-topologies (clojurify-structure (Nimbus/topologiesOnSupervisor assignments "super2")) user2-topologies (clojurify-structure (.filterAuthorized nimbus "getTopology" supervisor2-topologies))] (is (= (list "topo1") supervisor1-topologies)) - (is (= #{"topo1"} user1-topologies)) + (is (= #{"topo1"} user1-topologies)) (is (= (list "topo1" "topo2") supervisor2-topologies)) (is (= #{"topo1" "topo2"} user2-topologies))))) @@ -1903,6 +1967,6 @@ (.setAuthorizationHandler nimbus (reify IAuthorizer (permit [this context operation topo-conf] (= "authorized" (get topo-conf TOPOLOGY-NAME))))) (let [supervisor-topologies (clojurify-structure (Nimbus/topologiesOnSupervisor assignments "super1")) user-topologies (clojurify-structure (.filterAuthorized nimbus "getTopology" supervisor-topologies))] - + (is (= (list "topo1" "authorized") supervisor-topologies)) (is (= #{"authorized"} user-topologies))))) http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-server/src/main/java/org/apache/storm/LocalCluster.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java index 3f4a773..4fb4474 100644 --- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java +++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm; import java.lang.reflect.Method; @@ -31,7 +32,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.UnaryOperator; - import org.apache.storm.blobstore.BlobStore; import org.apache.storm.cluster.ClusterStateContext; import org.apache.storm.cluster.ClusterUtils; @@ -60,10 +60,9 @@ import org.apache.storm.generated.KeyNotFoundException; import org.apache.storm.generated.KillOptions; import org.apache.storm.generated.ListBlobsResult; import org.apache.storm.generated.LogConfig; -import org.apache.storm.generated.Nimbus.Iface; -import org.apache.storm.generated.Nimbus.Processor; import org.apache.storm.generated.NimbusSummary; import org.apache.storm.generated.NotAliveException; +import org.apache.storm.generated.OwnerResourceSummary; import org.apache.storm.generated.ProfileAction; import org.apache.storm.generated.ProfileRequest; import org.apache.storm.generated.ReadableBlobMeta; @@ -75,6 +74,8 @@ import org.apache.storm.generated.SupervisorPageInfo; import org.apache.storm.generated.TopologyHistoryInfo; import org.apache.storm.generated.TopologyInfo; import org.apache.storm.generated.TopologyPageInfo; +import org.apache.storm.generated.Nimbus.Iface; +import org.apache.storm.generated.Nimbus.Processor; import org.apache.storm.messaging.IContext; import org.apache.storm.messaging.local.Context; import org.apache.storm.nimbus.ILeaderElector; @@ -884,33 +885,33 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { @Override public void setLogConfig(String name, LogConfig config) throws TException { // TODO Auto-generated method stub - throw new RuntimeException("NOT IMPLMENETED YET"); + throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public LogConfig getLogConfig(String name) throws TException { // TODO Auto-generated method stub - throw new RuntimeException("NOT IMPLMENETED YET"); + throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public void debug(String name, String component, boolean enable, double samplingPercentage) throws NotAliveException, AuthorizationException, TException { // TODO Auto-generated method stub - throw new RuntimeException("NOT IMPLMENETED YET"); + throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public void setWorkerProfiler(String id, ProfileRequest profileRequest) throws TException { // TODO Auto-generated method stub - throw new RuntimeException("NOT IMPLMENETED YET"); + throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public List<ProfileRequest> getComponentPendingProfileActions(String id, String component_id, ProfileAction action) throws TException { // TODO Auto-generated method stub - throw new RuntimeException("NOT IMPLMENETED YET"); + throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override @@ -988,7 +989,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { @Override public void createStateInZookeeper(String key) throws TException { // TODO Auto-generated method stub - throw new RuntimeException("NOT IMPLMENETED YET"); + throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override @@ -1020,7 +1021,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { @Override public String getNimbusConf() throws AuthorizationException, TException { // TODO Auto-generated method stub - throw new RuntimeException("NOT IMPLMENETED YET"); + throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override @@ -1037,40 +1038,40 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { public TopologyInfo getTopologyInfoWithOpts(String id, GetInfoOptions options) throws NotAliveException, AuthorizationException, TException { // TODO Auto-generated method stub - throw new RuntimeException("NOT IMPLMENETED YET"); + throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public TopologyPageInfo getTopologyPageInfo(String id, String window, boolean is_include_sys) throws NotAliveException, AuthorizationException, TException { // TODO Auto-generated method stub - throw new RuntimeException("NOT IMPLMENETED YET"); + throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public SupervisorPageInfo getSupervisorPageInfo(String id, String host, boolean is_include_sys) throws NotAliveException, AuthorizationException, TException { // TODO Auto-generated method stub - throw new RuntimeException("NOT IMPLMENETED YET"); + throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public ComponentPageInfo getComponentPageInfo(String topology_id, String component_id, String window, boolean is_include_sys) throws NotAliveException, AuthorizationException, TException { // TODO Auto-generated method stub - throw new RuntimeException("NOT IMPLMENETED YET"); + throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public StormTopology getUserTopology(String id) throws NotAliveException, AuthorizationException, TException { // TODO Auto-generated method stub - throw new RuntimeException("NOT IMPLMENETED YET"); + throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public TopologyHistoryInfo getTopologyHistory(String user) throws AuthorizationException, TException { // TODO Auto-generated method stub - throw new RuntimeException("NOT IMPLMENETED YET"); + throw new RuntimeException("NOT IMPLEMENTED YET"); } /** @@ -1095,6 +1096,12 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { return ret; } } + + @Override + public List<OwnerResourceSummary> getOwnerResourceSummaries(String owner) throws AuthorizationException, TException { + // TODO Auto-generated method stub + throw new RuntimeException("NOT IMPLEMENTED YET"); + } public static void main(final String [] args) throws Exception { if (args.length < 1) {