http://git-wip-us.apache.org/repos/asf/storm/blob/f5ad9288/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java index 087fe6b..db0e263 100644 --- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java +++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java @@ -112,6 +112,7 @@ public class ResourceAwareScheduler implements IScheduler { cluster.setStatusMap(schedulingState.cluster.getStatusMap()); cluster.setSupervisorsResourcesMap(schedulingState.cluster.getSupervisorsResourcesMap()); cluster.setTopologyResourcesMap(schedulingState.cluster.getTopologyResourcesMap()); + cluster.setWorkerResourcesMap(schedulingState.cluster.getWorkerResourcesMap()); //updating resources used by supervisor updateSupervisorsResources(cluster, topologies); } @@ -243,6 +244,8 @@ public class ResourceAwareScheduler implements IScheduler { double assignedMemOffHeap = 0.0; double assignedCpu = 0.0; + Map<WorkerSlot, Double[]> workerResources = new HashMap<WorkerSlot, Double[]>(); + Set<String> nodesUsed = new HashSet<String>(); for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> workerToTasksEntry : schedulerAssignmentMap.entrySet()) { WorkerSlot targetSlot = workerToTasksEntry.getKey(); @@ -265,6 +268,11 @@ public class ResourceAwareScheduler implements IScheduler { assignedMemOnHeap += targetSlot.getAllocatedMemOnHeap(); assignedMemOffHeap += targetSlot.getAllocatedMemOffHeap(); assignedCpu += targetSlot.getAllocatedCpu(); + + Double[] worker_resources = { + requestedMemOnHeap, requestedMemOffHeap, requestedCpu, + targetSlot.getAllocatedMemOnHeap(), targetSlot.getAllocatedMemOffHeap(), targetSlot.getAllocatedCpu()}; + workerResources.put (targetSlot, worker_resources); } Double[] resources = {requestedMemOnHeap, requestedMemOffHeap, requestedCpu, @@ -275,6 +283,7 @@ public class ResourceAwareScheduler implements IScheduler { assignedMemOnHeap, assignedMemOffHeap, assignedCpu); //updating resources used for a topology this.schedulingState.cluster.setTopologyResources(td.getId(), resources); + this.schedulingState.cluster.setWorkerResources(td.getId(), workerResources); return true; } else { LOG.warn("schedulerAssignmentMap for topo {} is null. This shouldn't happen!", td.getName());
http://git-wip-us.apache.org/repos/asf/storm/blob/f5ad9288/storm-core/src/jvm/org/apache/storm/security/auth/authorizer/SimpleACLAuthorizer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/authorizer/SimpleACLAuthorizer.java b/storm-core/src/jvm/org/apache/storm/security/auth/authorizer/SimpleACLAuthorizer.java index 7cc1556..056505c 100644 --- a/storm-core/src/jvm/org/apache/storm/security/auth/authorizer/SimpleACLAuthorizer.java +++ b/storm-core/src/jvm/org/apache/storm/security/auth/authorizer/SimpleACLAuthorizer.java @@ -42,7 +42,12 @@ import org.slf4j.LoggerFactory; public class SimpleACLAuthorizer implements IAuthorizer { private static final Logger LOG = LoggerFactory.getLogger(SimpleACLAuthorizer.class); - protected Set<String> _userCommands = new HashSet<>(Arrays.asList("submitTopology", "fileUpload", "getNimbusConf", "getClusterInfo")); + protected Set<String> _userCommands = new HashSet<>(Arrays.asList( + "submitTopology", + "fileUpload", + "getNimbusConf", + "getClusterInfo", + "getSupervisorPageInfo")); protected Set<String> _supervisorCommands = new HashSet<>(Arrays.asList("fileDownload")); protected Set<String> _topoCommands = new HashSet<>(Arrays.asList( "killTopology", http://git-wip-us.apache.org/repos/asf/storm/blob/f5ad9288/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java index c9cfc0f..0524dc8 100644 --- a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java +++ b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java @@ -42,6 +42,9 @@ import org.apache.storm.generated.SpoutStats; import org.apache.storm.generated.StormTopology; import org.apache.storm.generated.TopologyPageInfo; import org.apache.storm.generated.TopologyStats; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.generated.WorkerSummary; +import org.apache.storm.scheduler.WorkerSlot; import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; import org.slf4j.Logger; @@ -1249,6 +1252,135 @@ public class StatsUtil { return thriftifyCompPageData(topologyId, topology, componentId, compStats); } + /** + * aggregate statistics per worker for a topology. Optionally filtering on specific supervisors + * + * @param topologyId topology id + * @param topology storm topology + * @param task2component a Map of {task id -> component}, note it's a clojure map + * @param beats a converted HashMap of executor heartbeats, {executor -> heartbeat} + * @param exec2hostPort a Map of {executor -> host+port}, note it's a clojure map + * @param includeSys whether to include system streams + * @param userAuthorized whether the user is authorized to view topology info + * @param filterSupervisor if not null, only return WorkerSummaries for that supervisor + * + * @return List<WorkerSummary> thrift structures + */ + public static List<WorkerSummary> aggWorkerStats(String stormId, String stormName, + Map<Integer, String> task2Component, + Map<List<Integer>, Map<String, Object>> beats, + Map<List<Long>, List<Object>> exec2NodePort, + Map<String, String> nodeHost, + Map<WorkerSlot, WorkerResources> worker2Resources, + boolean includeSys, boolean userAuthorized, String filterSupervisor) { + + // host,port => WorkerSummary + HashMap<WorkerSlot, WorkerSummary> workerSummaryMap = new HashMap<>(); + + if (exec2NodePort != null) { + // for each executor -> node+port pair + for (Map.Entry<List<Long>, List<Object>> execNodePort : exec2NodePort.entrySet()) { + List<Object> nodePort = execNodePort.getValue(); + String node = (String)nodePort.get(0); + Long port = (Long)nodePort.get(1); + String host = nodeHost.get(node); + WorkerSlot slot = new WorkerSlot(node, port); + WorkerResources resources = worker2Resources.get(slot); + + if (filterSupervisor == null || node.equals(filterSupervisor)) { + WorkerSummary ws = workerSummaryMap.get(slot); + + if (ws == null) { + ws = new WorkerSummary(); + ws.set_host(host); + ws.set_port(port.intValue()); + ws.set_supervisor_id(node); + ws.set_topology_id(stormId); + ws.set_topology_name(stormName); + ws.set_num_executors(0); + if (resources != null) { + ws.set_assigned_memonheap(resources.get_mem_on_heap()); + ws.set_assigned_memoffheap(resources.get_mem_off_heap()); + ws.set_assigned_cpu(resources.get_cpu()); + } else { + ws.set_assigned_memonheap(0); + ws.set_assigned_memoffheap(0); + ws.set_assigned_cpu(0); + } + ws.set_component_to_num_tasks(new HashMap<String,Long>()); + workerSummaryMap.put(slot, ws); + } + Map<String, Long> componentToNumTasks = ws.get_component_to_num_tasks(); + + // gets min/max task pairs (executors): [1 1] [2 3] ... + List<Long> exec = execNodePort.getKey(); + // get executor heartbeat + int hbeatSecs = 0; + if (beats != null) { + Map<String, Object> beat = beats.get(convertExecutor(exec)); + if (beat != null) { + Map<String, Object> hbeat = (Map<String, Object>)beat.get("heartbeat"); + hbeatSecs = hbeat == null ? 0 : (int) hbeat.get("uptime"); + } + } + ws.set_uptime_secs(hbeatSecs); + ws.set_num_executors(ws.get_num_executors() + 1); + + // get tasks if the user is authorized for this topology + if (userAuthorized) { + int firstTask = exec.get(0).intValue(); + int lastTask = exec.get(1).intValue(); + + // get per task components + for (int task = firstTask; task <= lastTask; task++) { + String component = task2Component.get(task); + // if the component is a system (__*) component and we are hiding + // them in UI, keep going + if (!includeSys && Utils.isSystemId(component)) { + continue; + } + + // good to go, increment # of tasks this component is being executed on + Long counter = componentToNumTasks.get(component); + if (counter == null) { + counter = new Long(0); + } + componentToNumTasks.put(component, counter + 1); + } + } + } + } + } + return new ArrayList<WorkerSummary>(workerSummaryMap.values()); + } + + /** + * Aggregate statistics per worker for a topology. Optionally filtering on specific supervisors + * + * Convenience overload when called from the topology page code (in that case we want data + * for all workers in the topology, not filtered by supervisor) + * + * @param topologyId topology id + * @param topology storm topology + * @param task2component a Map of {task id -> component}, note it's a clojure map + * @param beats a converted HashMap of executor heartbeats, {executor -> heartbeat} + * @param exec2hostPort a Map of {executor -> host+port}, note it's a clojure map + * @param includeSys whether to include system streams + * @param userAuthorized whether the user is authorized to view topology info + * + * @return List<WorkerSummary> thrift structures + */ + public static List<WorkerSummary> aggWorkerStats(String stormId, String stormName, + Map<Integer, String> task2Component, + Map<List<Integer>, Map<String, Object>> beats, + Map<List<Long>, List<Object>> exec2NodePort, + Map<String, String> nodeHost, + Map<WorkerSlot, WorkerResources> worker2Resources, + boolean includeSys, boolean userAuthorized) { + return aggWorkerStats(stormId, stormName, + task2Component, beats, exec2NodePort, nodeHost, worker2Resources, + includeSys, userAuthorized, null); + } // ===================================================================================== // convert thrift stats to java maps http://git-wip-us.apache.org/repos/asf/storm/blob/f5ad9288/storm-core/src/py/storm/Nimbus-remote ---------------------------------------------------------------------- diff --git a/storm-core/src/py/storm/Nimbus-remote b/storm-core/src/py/storm/Nimbus-remote index 5b8e396..b39050e 100644 --- a/storm-core/src/py/storm/Nimbus-remote +++ b/storm-core/src/py/storm/Nimbus-remote @@ -79,6 +79,7 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help': print(' TopologyInfo getTopologyInfo(string id)') print(' TopologyInfo getTopologyInfoWithOpts(string id, GetInfoOptions options)') print(' TopologyPageInfo getTopologyPageInfo(string id, string window, bool is_include_sys)') + print(' SupervisorPageInfo getSupervisorPageInfo(string id, string host, bool is_include_sys)') print(' ComponentPageInfo getComponentPageInfo(string topology_id, string component_id, string window, bool is_include_sys)') print(' string getTopologyConf(string id)') print(' StormTopology getTopology(string id)') @@ -362,6 +363,12 @@ elif cmd == 'getTopologyPageInfo': sys.exit(1) pp.pprint(client.getTopologyPageInfo(args[0],args[1],eval(args[2]),)) +elif cmd == 'getSupervisorPageInfo': + if len(args) != 3: + print('getSupervisorPageInfo requires 3 args') + sys.exit(1) + pp.pprint(client.getSupervisorPageInfo(args[0],args[1],eval(args[2]),)) + elif cmd == 'getComponentPageInfo': if len(args) != 4: print('getComponentPageInfo requires 4 args') http://git-wip-us.apache.org/repos/asf/storm/blob/f5ad9288/storm-core/src/py/storm/Nimbus.py ---------------------------------------------------------------------- diff --git a/storm-core/src/py/storm/Nimbus.py b/storm-core/src/py/storm/Nimbus.py index 3b680ec..dba051a 100644 --- a/storm-core/src/py/storm/Nimbus.py +++ b/storm-core/src/py/storm/Nimbus.py @@ -315,6 +315,15 @@ class Iface: """ pass + def getSupervisorPageInfo(self, id, host, is_include_sys): + """ + Parameters: + - id + - host + - is_include_sys + """ + pass + def getComponentPageInfo(self, topology_id, component_id, window, is_include_sys): """ Parameters: @@ -1625,6 +1634,45 @@ class Client(Iface): raise result.aze raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopologyPageInfo failed: unknown result") + def getSupervisorPageInfo(self, id, host, is_include_sys): + """ + Parameters: + - id + - host + - is_include_sys + """ + self.send_getSupervisorPageInfo(id, host, is_include_sys) + return self.recv_getSupervisorPageInfo() + + def send_getSupervisorPageInfo(self, id, host, is_include_sys): + self._oprot.writeMessageBegin('getSupervisorPageInfo', TMessageType.CALL, self._seqid) + args = getSupervisorPageInfo_args() + args.id = id + args.host = host + args.is_include_sys = is_include_sys + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_getSupervisorPageInfo(self): + iprot = self._iprot + (fname, mtype, rseqid) = iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(iprot) + iprot.readMessageEnd() + raise x + result = getSupervisorPageInfo_result() + result.read(iprot) + iprot.readMessageEnd() + if result.success is not None: + return result.success + if result.e is not None: + raise result.e + if result.aze is not None: + raise result.aze + raise TApplicationException(TApplicationException.MISSING_RESULT, "getSupervisorPageInfo failed: unknown result") + def getComponentPageInfo(self, topology_id, component_id, window, is_include_sys): """ Parameters: @@ -1850,6 +1898,7 @@ class Processor(Iface, TProcessor): self._processMap["getTopologyInfo"] = Processor.process_getTopologyInfo self._processMap["getTopologyInfoWithOpts"] = Processor.process_getTopologyInfoWithOpts self._processMap["getTopologyPageInfo"] = Processor.process_getTopologyPageInfo + self._processMap["getSupervisorPageInfo"] = Processor.process_getSupervisorPageInfo self._processMap["getComponentPageInfo"] = Processor.process_getComponentPageInfo self._processMap["getTopologyConf"] = Processor.process_getTopologyConf self._processMap["getTopology"] = Processor.process_getTopology @@ -2739,6 +2788,31 @@ class Processor(Iface, TProcessor): oprot.writeMessageEnd() oprot.trans.flush() + def process_getSupervisorPageInfo(self, seqid, iprot, oprot): + args = getSupervisorPageInfo_args() + args.read(iprot) + iprot.readMessageEnd() + result = getSupervisorPageInfo_result() + try: + result.success = self._handler.getSupervisorPageInfo(args.id, args.host, args.is_include_sys) + msg_type = TMessageType.REPLY + except (TTransport.TTransportException, KeyboardInterrupt, SystemExit): + raise + except NotAliveException as e: + msg_type = TMessageType.REPLY + result.e = e + except AuthorizationException as aze: + msg_type = TMessageType.REPLY + result.aze = aze + except Exception as ex: + msg_type = TMessageType.EXCEPTION + logging.exception(ex) + result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error') + oprot.writeMessageBegin("getSupervisorPageInfo", msg_type, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + def process_getComponentPageInfo(self, seqid, iprot, oprot): args = getComponentPageInfo_args() args.read(iprot) @@ -4723,11 +4797,11 @@ class getComponentPendingProfileActions_result: if fid == 0: if ftype == TType.LIST: self.success = [] - (_etype662, _size659) = iprot.readListBegin() - for _i663 in xrange(_size659): - _elem664 = ProfileRequest() - _elem664.read(iprot) - self.success.append(_elem664) + (_etype692, _size689) = iprot.readListBegin() + for _i693 in xrange(_size689): + _elem694 = ProfileRequest() + _elem694.read(iprot) + self.success.append(_elem694) iprot.readListEnd() else: iprot.skip(ftype) @@ -4744,8 +4818,8 @@ class getComponentPendingProfileActions_result: if self.success is not None: oprot.writeFieldBegin('success', TType.LIST, 0) oprot.writeListBegin(TType.STRUCT, len(self.success)) - for iter665 in self.success: - iter665.write(oprot) + for iter695 in self.success: + iter695.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -8457,6 +8531,190 @@ class getTopologyPageInfo_result: def __ne__(self, other): return not (self == other) +class getSupervisorPageInfo_args: + """ + Attributes: + - id + - host + - is_include_sys + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'id', None, None, ), # 1 + (2, TType.STRING, 'host', None, None, ), # 2 + (3, TType.BOOL, 'is_include_sys', None, None, ), # 3 + ) + + def __init__(self, id=None, host=None, is_include_sys=None,): + self.id = id + self.host = host + self.is_include_sys = is_include_sys + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.id = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.host = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.BOOL: + self.is_include_sys = iprot.readBool() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('getSupervisorPageInfo_args') + if self.id is not None: + oprot.writeFieldBegin('id', TType.STRING, 1) + oprot.writeString(self.id.encode('utf-8')) + oprot.writeFieldEnd() + if self.host is not None: + oprot.writeFieldBegin('host', TType.STRING, 2) + oprot.writeString(self.host.encode('utf-8')) + oprot.writeFieldEnd() + if self.is_include_sys is not None: + oprot.writeFieldBegin('is_include_sys', TType.BOOL, 3) + oprot.writeBool(self.is_include_sys) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.id) + value = (value * 31) ^ hash(self.host) + value = (value * 31) ^ hash(self.is_include_sys) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class getSupervisorPageInfo_result: + """ + Attributes: + - success + - e + - aze + """ + + thrift_spec = ( + (0, TType.STRUCT, 'success', (SupervisorPageInfo, SupervisorPageInfo.thrift_spec), None, ), # 0 + (1, TType.STRUCT, 'e', (NotAliveException, NotAliveException.thrift_spec), None, ), # 1 + (2, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 2 + ) + + def __init__(self, success=None, e=None, aze=None,): + self.success = success + self.e = e + self.aze = aze + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 0: + if ftype == TType.STRUCT: + self.success = SupervisorPageInfo() + self.success.read(iprot) + else: + iprot.skip(ftype) + elif fid == 1: + if ftype == TType.STRUCT: + self.e = NotAliveException() + self.e.read(iprot) + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRUCT: + self.aze = AuthorizationException() + self.aze.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('getSupervisorPageInfo_result') + if self.success is not None: + oprot.writeFieldBegin('success', TType.STRUCT, 0) + self.success.write(oprot) + oprot.writeFieldEnd() + if self.e is not None: + oprot.writeFieldBegin('e', TType.STRUCT, 1) + self.e.write(oprot) + oprot.writeFieldEnd() + if self.aze is not None: + oprot.writeFieldBegin('aze', TType.STRUCT, 2) + self.aze.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.success) + value = (value * 31) ^ hash(self.e) + value = (value * 31) ^ hash(self.aze) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + class getComponentPageInfo_args: """ Attributes:
