Adding nimbus summary info to zookeeper.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4502bffb Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4502bffb Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4502bffb Branch: refs/heads/0.11.x-branch Commit: 4502bffbe3f9b4cd3674a56afbda1bb115cec239 Parents: 1b6491f Author: Parth Brahmbhatt <[email protected]> Authored: Thu Feb 12 11:27:50 2015 -0800 Committer: Parth Brahmbhatt <[email protected]> Committed: Thu Feb 12 11:27:50 2015 -0800 ---------------------------------------------------------------------- storm-core/src/clj/backtype/storm/cluster.clj | 24 +- storm-core/src/clj/backtype/storm/config.clj | 10 + .../src/clj/backtype/storm/daemon/nimbus.clj | 27 +- storm-core/src/clj/backtype/storm/ui/core.clj | 28 +- .../storm/generated/ClusterSummary.java | 232 +++--- .../backtype/storm/generated/NimbusSummary.java | 723 +++++++++++++++++++ storm-core/src/py/storm/ttypes.py | 577 +++++++++------ storm-core/src/storm.thrift | 10 +- .../public/templates/index-page-template.html | 26 +- 9 files changed, 1297 insertions(+), 360 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/clj/backtype/storm/cluster.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index 2c58510..3bf6628 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -146,6 +146,12 @@ (code-distributor [this callback]) ;returns lits of nimbusinfos under /stormroot/code-distributor/storm-id (code-distributor-info [this storm-id]) + + ;returns list of nimbus summaries stored under /stormroot/nimbuses/<nimbus-ids> -> <data> + (nimbuses [this]) + ;adds the NimbusSummary to /stormroot/nimbuses/nimbus-id + (add-nimbus-host! [this nimbus-id nimbus-summary]) + (active-storms [this]) (storm-base [this storm-id callback]) (get-worker-heartbeat [this storm-id node port]) @@ -180,14 +186,17 @@ (def WORKERBEATS-ROOT "workerbeats") (def ERRORS-ROOT "errors") (def CODE-DISTRIBUTOR-ROOT "code-distributor") +(def NIMBUSES-ROOT "nimbuses") (def CREDENTIALS-ROOT "credentials") + (def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT)) (def STORMS-SUBTREE (str "/" STORMS-ROOT)) (def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT)) (def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT)) (def ERRORS-SUBTREE (str "/" ERRORS-ROOT)) (def CODE-DISTRIBUTOR-SUBTREE (str "/" CODE-DISTRIBUTOR-ROOT)) +(def NIMBUSES-SUBTREE (str "/" NIMBUSES-ROOT)) (def CREDENTIALS-SUBTREE (str "/" CREDENTIALS-ROOT)) (defn supervisor-path @@ -202,6 +211,10 @@ [id] (str CODE-DISTRIBUTOR-SUBTREE "/" id)) +(defn nimbus-path + [id] + (str NIMBUSES-SUBTREE "/" id)) + (defn storm-path [id] (str STORMS-SUBTREE "/" id)) @@ -292,7 +305,7 @@ CREDENTIALS-ROOT (issue-map-callback! credentials-callback (first args)) ;; this should never happen (exit-process! 30 "Unknown callback for subtree " subtree args)))))] - (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE CODE-DISTRIBUTOR-SUBTREE]] + (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE CODE-DISTRIBUTOR-SUBTREE NIMBUSES-SUBTREE]] (mkdirs cluster-state p acls)) (reify StormClusterState @@ -330,6 +343,15 @@ (reset! code-distributor-callback callback)) (get-children cluster-state CODE-DISTRIBUTOR-SUBTREE (not-nil? callback))) + (nimbuses + [this] + (map #(maybe-deserialize (get-data cluster-state (nimbus-path %1) false)) + (get-children cluster-state NIMBUSES-SUBTREE false))) + + (add-nimbus-host! + [this nimbus-id nimbus-summary] + (set-ephemeral-node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls)) + (code-distributor-info [this storm-id] (map (fn [nimbus-info] (NimbusInfo/parse nimbus-info)) (get-children cluster-state (code-distributor-path storm-id) false))) http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/clj/backtype/storm/config.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj index a6b160d..f3c70e5 100644 --- a/storm-core/src/clj/backtype/storm/config.clj +++ b/storm-core/src/clj/backtype/storm/config.clj @@ -282,3 +282,13 @@ (defn ^LocalState worker-state [conf id] (LocalState. (worker-heartbeats-root conf id))) + +(defn read-storm-version + "Returns a string containing the Storm version or 'Unknown'." + [] + (let [storm-home (System/getProperty "storm.home") + release-path (format "%s/RELEASE" storm-home) + release-file (File. release-path)] + (if (and (.exists release-file) (.isFile release-file)) + (str/trim (slurp release-path)) + "Unknown"))) http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index e354fab..52ee708 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -15,7 +15,8 @@ ;; limitations under the License. (ns backtype.storm.daemon.nimbus (:import [java.nio ByteBuffer] - [java.util Collections]) + [java.util Collections] + [backtype.storm.generated NimbusSummary]) (:import [java.io FileNotFoundException]) (:import [java.net InetAddress]) (:import [java.nio.channels Channels WritableByteChannel]) @@ -104,6 +105,7 @@ :id->sched-status (atom {}) :cred-renewers (AuthUtils/GetCredentialRenewers conf) :nimbus-autocred-plugins (AuthUtils/getNimbusAutoCredPlugins conf) + :nimbuses-cache (atom {}) ;;TODO need to figure out how to keep the cache upto date, one more thread })) (defn inbox [nimbus] @@ -1030,6 +1032,17 @@ (let [nimbus (nimbus-data conf inimbus) principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)] (.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) conf) + + ;add to nimbuses + (.add-nimbus-host! (:storm-cluster-state nimbus) + (.toHostPortString (:nimbus-host-port-info nimbus)) + { + :host (.getHost (:nimbus-host-port-info nimbus)) + :port (.getPort (:nimbus-host-port-info nimbus)) + :start-time-secs (current-time-secs) + :version (read-storm-version) + }) + (.addToLeaderLockQueue (:leader-elector nimbus)) (cleanup-corrupt-topologies! nimbus) ;register call back for code-distributor @@ -1287,8 +1300,14 @@ (count (:used-ports info)) id ) )) - nimbus-uptime ((:uptime nimbus)) bases (topology-bases storm-cluster-state) + nimbuses (.nimbuses storm-cluster-state) + nimbuses (map #(NimbusSummary. (:host %1) (:port %1) (time-delta (:start-time-secs %1)) + (let [leader (.getLeader (:leader-elector nimbus))] + (and (= (.getHost leader) (:host %1)) (= (.getPort leader) (:port %1)))) + (:version %1)) + nimbuses + ) topology-summaries (dofor [[id base] bases :when base] (let [assignment (.assignment-info storm-cluster-state id nil) topo-summ (TopologySummary. id @@ -1312,8 +1331,8 @@ topo-summ ))] (ClusterSummary. supervisor-summaries - nimbus-uptime - topology-summaries) + topology-summaries + nimbuses) )) (^TopologyInfo getTopologyInfo [this ^String storm-id] http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/clj/backtype/storm/ui/core.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj index 3c7f578..94b0311 100644 --- a/storm-core/src/clj/backtype/storm/ui/core.clj +++ b/storm-core/src/clj/backtype/storm/ui/core.clj @@ -78,16 +78,6 @@ (map #(.get_stats ^ExecutorSummary %)) (filter not-nil?))) -(defn read-storm-version - "Returns a string containing the Storm version or 'Unknown'." - [] - (let [storm-home (System/getProperty "storm.home") - release-path (format "%s/RELEASE" storm-home) - release-file (File. release-path)] - (if (and (.exists release-file) (.isFile release-file)) - (trim (slurp release-path)) - "Unknown"))) - (defn component-type "Returns the component type (either :bolt or :spout) for a given topology and component id. Returns nil if not found." @@ -520,7 +510,6 @@ (reduce +))] {"user" user "stormVersion" (read-storm-version) - "nimbusUptime" (pretty-uptime-sec (.get_nimbus_uptime_secs summ)) "supervisors" (count sups) "slotsTotal" total-slots "slotsUsed" used-slots @@ -530,18 +519,19 @@ (defn nimbus-summary ([] - (let [leader-elector (zk-leader-elector *STORM-CONF*) - nimbus-hosts (.getAllNimbuses leader-elector) - no-op (.close leader-elector)] - (nimbus-summary nimbus-hosts))) + (with-nimbus nimbus + (nimbus-summary + (.get_nimbuses (.getClusterInfo ^Nimbus$Client nimbus))))) ([nimbuses] {"nimbuses" (for [^NimbusInfo n nimbuses] { - "host" (.getHost n) - "port" (.getPort n) - "nimbusLogLink" (nimbus-log-link (.getHost n) (.getPort n)) - "isLeader" (.isLeader n)})})) + "host" (.get_host n) + "port" (.get_port n) + "nimbusLogLink" (nimbus-log-link (.get_host n) (.get_port n)) + "isLeader" (.is_isLeader n) + "version" (.get_version n) + "nimbusUpTime" (pretty-uptime-sec (.get_uptimeSecs n))})})) (defn supervisor-summary ([] http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java b/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java index a2623ab..7e32c72 100644 --- a/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java +++ b/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java @@ -42,18 +42,18 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ClusterSummary"); private static final org.apache.thrift.protocol.TField SUPERVISORS_FIELD_DESC = new org.apache.thrift.protocol.TField("supervisors", org.apache.thrift.protocol.TType.LIST, (short)1); - private static final org.apache.thrift.protocol.TField NIMBUS_UPTIME_SECS_FIELD_DESC = new org.apache.thrift.protocol.TField("nimbus_uptime_secs", org.apache.thrift.protocol.TType.I32, (short)2); private static final org.apache.thrift.protocol.TField TOPOLOGIES_FIELD_DESC = new org.apache.thrift.protocol.TField("topologies", org.apache.thrift.protocol.TType.LIST, (short)3); + private static final org.apache.thrift.protocol.TField NIMBUSES_FIELD_DESC = new org.apache.thrift.protocol.TField("nimbuses", org.apache.thrift.protocol.TType.LIST, (short)4); private List<SupervisorSummary> supervisors; // required - private int nimbus_uptime_secs; // required private List<TopologySummary> topologies; // required + private List<NimbusSummary> nimbuses; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { SUPERVISORS((short)1, "supervisors"), - NIMBUS_UPTIME_SECS((short)2, "nimbus_uptime_secs"), - TOPOLOGIES((short)3, "topologies"); + TOPOLOGIES((short)3, "topologies"), + NIMBUSES((short)4, "nimbuses"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -70,10 +70,10 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C switch(fieldId) { case 1: // SUPERVISORS return SUPERVISORS; - case 2: // NIMBUS_UPTIME_SECS - return NIMBUS_UPTIME_SECS; case 3: // TOPOLOGIES return TOPOLOGIES; + case 4: // NIMBUSES + return NIMBUSES; default: return null; } @@ -114,8 +114,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C } // isset id assignments - private static final int __NIMBUS_UPTIME_SECS_ISSET_ID = 0; - private BitSet __isset_bit_vector = new BitSet(1); public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { @@ -123,11 +121,12 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C tmpMap.put(_Fields.SUPERVISORS, new org.apache.thrift.meta_data.FieldMetaData("supervisors", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, SupervisorSummary.class)))); - tmpMap.put(_Fields.NIMBUS_UPTIME_SECS, new org.apache.thrift.meta_data.FieldMetaData("nimbus_uptime_secs", org.apache.thrift.TFieldRequirementType.REQUIRED, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); tmpMap.put(_Fields.TOPOLOGIES, new org.apache.thrift.meta_data.FieldMetaData("topologies", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TopologySummary.class)))); + tmpMap.put(_Fields.NIMBUSES, new org.apache.thrift.meta_data.FieldMetaData("nimbuses", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, NimbusSummary.class)))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ClusterSummary.class, metaDataMap); } @@ -137,22 +136,19 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C public ClusterSummary( List<SupervisorSummary> supervisors, - int nimbus_uptime_secs, - List<TopologySummary> topologies) + List<TopologySummary> topologies, + List<NimbusSummary> nimbuses) { this(); this.supervisors = supervisors; - this.nimbus_uptime_secs = nimbus_uptime_secs; - set_nimbus_uptime_secs_isSet(true); this.topologies = topologies; + this.nimbuses = nimbuses; } /** * Performs a deep copy on <i>other</i>. */ public ClusterSummary(ClusterSummary other) { - __isset_bit_vector.clear(); - __isset_bit_vector.or(other.__isset_bit_vector); if (other.is_set_supervisors()) { List<SupervisorSummary> __this__supervisors = new ArrayList<SupervisorSummary>(); for (SupervisorSummary other_element : other.supervisors) { @@ -160,7 +156,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C } this.supervisors = __this__supervisors; } - this.nimbus_uptime_secs = other.nimbus_uptime_secs; if (other.is_set_topologies()) { List<TopologySummary> __this__topologies = new ArrayList<TopologySummary>(); for (TopologySummary other_element : other.topologies) { @@ -168,6 +163,13 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C } this.topologies = __this__topologies; } + if (other.is_set_nimbuses()) { + List<NimbusSummary> __this__nimbuses = new ArrayList<NimbusSummary>(); + for (NimbusSummary other_element : other.nimbuses) { + __this__nimbuses.add(new NimbusSummary(other_element)); + } + this.nimbuses = __this__nimbuses; + } } public ClusterSummary deepCopy() { @@ -177,9 +179,8 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C @Override public void clear() { this.supervisors = null; - set_nimbus_uptime_secs_isSet(false); - this.nimbus_uptime_secs = 0; this.topologies = null; + this.nimbuses = null; } public int get_supervisors_size() { @@ -220,28 +221,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C } } - public int get_nimbus_uptime_secs() { - return this.nimbus_uptime_secs; - } - - public void set_nimbus_uptime_secs(int nimbus_uptime_secs) { - this.nimbus_uptime_secs = nimbus_uptime_secs; - set_nimbus_uptime_secs_isSet(true); - } - - public void unset_nimbus_uptime_secs() { - __isset_bit_vector.clear(__NIMBUS_UPTIME_SECS_ISSET_ID); - } - - /** Returns true if field nimbus_uptime_secs is set (has been assigned a value) and false otherwise */ - public boolean is_set_nimbus_uptime_secs() { - return __isset_bit_vector.get(__NIMBUS_UPTIME_SECS_ISSET_ID); - } - - public void set_nimbus_uptime_secs_isSet(boolean value) { - __isset_bit_vector.set(__NIMBUS_UPTIME_SECS_ISSET_ID, value); - } - public int get_topologies_size() { return (this.topologies == null) ? 0 : this.topologies.size(); } @@ -280,6 +259,44 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C } } + public int get_nimbuses_size() { + return (this.nimbuses == null) ? 0 : this.nimbuses.size(); + } + + public java.util.Iterator<NimbusSummary> get_nimbuses_iterator() { + return (this.nimbuses == null) ? null : this.nimbuses.iterator(); + } + + public void add_to_nimbuses(NimbusSummary elem) { + if (this.nimbuses == null) { + this.nimbuses = new ArrayList<NimbusSummary>(); + } + this.nimbuses.add(elem); + } + + public List<NimbusSummary> get_nimbuses() { + return this.nimbuses; + } + + public void set_nimbuses(List<NimbusSummary> nimbuses) { + this.nimbuses = nimbuses; + } + + public void unset_nimbuses() { + this.nimbuses = null; + } + + /** Returns true if field nimbuses is set (has been assigned a value) and false otherwise */ + public boolean is_set_nimbuses() { + return this.nimbuses != null; + } + + public void set_nimbuses_isSet(boolean value) { + if (!value) { + this.nimbuses = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case SUPERVISORS: @@ -290,19 +307,19 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C } break; - case NIMBUS_UPTIME_SECS: + case TOPOLOGIES: if (value == null) { - unset_nimbus_uptime_secs(); + unset_topologies(); } else { - set_nimbus_uptime_secs((Integer)value); + set_topologies((List<TopologySummary>)value); } break; - case TOPOLOGIES: + case NIMBUSES: if (value == null) { - unset_topologies(); + unset_nimbuses(); } else { - set_topologies((List<TopologySummary>)value); + set_nimbuses((List<NimbusSummary>)value); } break; @@ -314,12 +331,12 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C case SUPERVISORS: return get_supervisors(); - case NIMBUS_UPTIME_SECS: - return Integer.valueOf(get_nimbus_uptime_secs()); - case TOPOLOGIES: return get_topologies(); + case NIMBUSES: + return get_nimbuses(); + } throw new IllegalStateException(); } @@ -333,10 +350,10 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C switch (field) { case SUPERVISORS: return is_set_supervisors(); - case NIMBUS_UPTIME_SECS: - return is_set_nimbus_uptime_secs(); case TOPOLOGIES: return is_set_topologies(); + case NIMBUSES: + return is_set_nimbuses(); } throw new IllegalStateException(); } @@ -363,15 +380,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C return false; } - boolean this_present_nimbus_uptime_secs = true; - boolean that_present_nimbus_uptime_secs = true; - if (this_present_nimbus_uptime_secs || that_present_nimbus_uptime_secs) { - if (!(this_present_nimbus_uptime_secs && that_present_nimbus_uptime_secs)) - return false; - if (this.nimbus_uptime_secs != that.nimbus_uptime_secs) - return false; - } - boolean this_present_topologies = true && this.is_set_topologies(); boolean that_present_topologies = true && that.is_set_topologies(); if (this_present_topologies || that_present_topologies) { @@ -381,6 +389,15 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C return false; } + boolean this_present_nimbuses = true && this.is_set_nimbuses(); + boolean that_present_nimbuses = true && that.is_set_nimbuses(); + if (this_present_nimbuses || that_present_nimbuses) { + if (!(this_present_nimbuses && that_present_nimbuses)) + return false; + if (!this.nimbuses.equals(that.nimbuses)) + return false; + } + return true; } @@ -393,16 +410,16 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C if (present_supervisors) builder.append(supervisors); - boolean present_nimbus_uptime_secs = true; - builder.append(present_nimbus_uptime_secs); - if (present_nimbus_uptime_secs) - builder.append(nimbus_uptime_secs); - boolean present_topologies = true && (is_set_topologies()); builder.append(present_topologies); if (present_topologies) builder.append(topologies); + boolean present_nimbuses = true && (is_set_nimbuses()); + builder.append(present_nimbuses); + if (present_nimbuses) + builder.append(nimbuses); + return builder.toHashCode(); } @@ -424,22 +441,22 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C return lastComparison; } } - lastComparison = Boolean.valueOf(is_set_nimbus_uptime_secs()).compareTo(typedOther.is_set_nimbus_uptime_secs()); + lastComparison = Boolean.valueOf(is_set_topologies()).compareTo(typedOther.is_set_topologies()); if (lastComparison != 0) { return lastComparison; } - if (is_set_nimbus_uptime_secs()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.nimbus_uptime_secs, typedOther.nimbus_uptime_secs); + if (is_set_topologies()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.topologies, typedOther.topologies); if (lastComparison != 0) { return lastComparison; } } - lastComparison = Boolean.valueOf(is_set_topologies()).compareTo(typedOther.is_set_topologies()); + lastComparison = Boolean.valueOf(is_set_nimbuses()).compareTo(typedOther.is_set_nimbuses()); if (lastComparison != 0) { return lastComparison; } - if (is_set_topologies()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.topologies, typedOther.topologies); + if (is_set_nimbuses()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.nimbuses, typedOther.nimbuses); if (lastComparison != 0) { return lastComparison; } @@ -479,14 +496,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); } break; - case 2: // NIMBUS_UPTIME_SECS - if (field.type == org.apache.thrift.protocol.TType.I32) { - this.nimbus_uptime_secs = iprot.readI32(); - set_nimbus_uptime_secs_isSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); - } - break; case 3: // TOPOLOGIES if (field.type == org.apache.thrift.protocol.TType.LIST) { { @@ -505,6 +514,24 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); } break; + case 4: // NIMBUSES + if (field.type == org.apache.thrift.protocol.TType.LIST) { + { + org.apache.thrift.protocol.TList _list43 = iprot.readListBegin(); + this.nimbuses = new ArrayList<NimbusSummary>(_list43.size); + for (int _i44 = 0; _i44 < _list43.size; ++_i44) + { + NimbusSummary _elem45; // required + _elem45 = new NimbusSummary(); + _elem45.read(iprot); + this.nimbuses.add(_elem45); + } + iprot.readListEnd(); + } + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); } @@ -522,24 +549,33 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C oprot.writeFieldBegin(SUPERVISORS_FIELD_DESC); { oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, this.supervisors.size())); - for (SupervisorSummary _iter43 : this.supervisors) + for (SupervisorSummary _iter46 : this.supervisors) { - _iter43.write(oprot); + _iter46.write(oprot); } oprot.writeListEnd(); } oprot.writeFieldEnd(); } - oprot.writeFieldBegin(NIMBUS_UPTIME_SECS_FIELD_DESC); - oprot.writeI32(this.nimbus_uptime_secs); - oprot.writeFieldEnd(); if (this.topologies != null) { oprot.writeFieldBegin(TOPOLOGIES_FIELD_DESC); { oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, this.topologies.size())); - for (TopologySummary _iter44 : this.topologies) + for (TopologySummary _iter47 : this.topologies) + { + _iter47.write(oprot); + } + oprot.writeListEnd(); + } + oprot.writeFieldEnd(); + } + if (this.nimbuses != null) { + oprot.writeFieldBegin(NIMBUSES_FIELD_DESC); + { + oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, this.nimbuses.size())); + for (NimbusSummary _iter48 : this.nimbuses) { - _iter44.write(oprot); + _iter48.write(oprot); } oprot.writeListEnd(); } @@ -562,10 +598,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C } first = false; if (!first) sb.append(", "); - sb.append("nimbus_uptime_secs:"); - sb.append(this.nimbus_uptime_secs); - first = false; - if (!first) sb.append(", "); sb.append("topologies:"); if (this.topologies == null) { sb.append("null"); @@ -573,6 +605,14 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C sb.append(this.topologies); } first = false; + if (!first) sb.append(", "); + sb.append("nimbuses:"); + if (this.nimbuses == null) { + sb.append("null"); + } else { + sb.append(this.nimbuses); + } + first = false; sb.append(")"); return sb.toString(); } @@ -583,14 +623,14 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C throw new org.apache.thrift.protocol.TProtocolException("Required field 'supervisors' is unset! Struct:" + toString()); } - if (!is_set_nimbus_uptime_secs()) { - throw new org.apache.thrift.protocol.TProtocolException("Required field 'nimbus_uptime_secs' is unset! Struct:" + toString()); - } - if (!is_set_topologies()) { throw new org.apache.thrift.protocol.TProtocolException("Required field 'topologies' is unset! Struct:" + toString()); } + if (!is_set_nimbuses()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'nimbuses' is unset! Struct:" + toString()); + } + } private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { @@ -603,8 +643,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { - // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. - __isset_bit_vector = new BitSet(1); read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/jvm/backtype/storm/generated/NimbusSummary.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/generated/NimbusSummary.java b/storm-core/src/jvm/backtype/storm/generated/NimbusSummary.java new file mode 100644 index 0000000..195048a --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/generated/NimbusSummary.java @@ -0,0 +1,723 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Autogenerated by Thrift Compiler (0.7.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + */ +package backtype.storm.generated; + +import org.apache.commons.lang.builder.HashCodeBuilder; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NimbusSummary implements org.apache.thrift.TBase<NimbusSummary, NimbusSummary._Fields>, java.io.Serializable, Cloneable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("NimbusSummary"); + + private static final org.apache.thrift.protocol.TField HOST_FIELD_DESC = new org.apache.thrift.protocol.TField("host", org.apache.thrift.protocol.TType.STRING, (short)1); + private static final org.apache.thrift.protocol.TField PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("port", org.apache.thrift.protocol.TType.I32, (short)2); + private static final org.apache.thrift.protocol.TField UPTIME_SECS_FIELD_DESC = new org.apache.thrift.protocol.TField("uptimeSecs", org.apache.thrift.protocol.TType.I32, (short)3); + private static final org.apache.thrift.protocol.TField IS_LEADER_FIELD_DESC = new org.apache.thrift.protocol.TField("isLeader", org.apache.thrift.protocol.TType.BOOL, (short)4); + private static final org.apache.thrift.protocol.TField VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("version", org.apache.thrift.protocol.TType.STRING, (short)5); + + private String host; // required + private int port; // required + private int uptimeSecs; // required + private boolean isLeader; // required + private String version; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + HOST((short)1, "host"), + PORT((short)2, "port"), + UPTIME_SECS((short)3, "uptimeSecs"), + IS_LEADER((short)4, "isLeader"), + VERSION((short)5, "version"); + + private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // HOST + return HOST; + case 2: // PORT + return PORT; + case 3: // UPTIME_SECS + return UPTIME_SECS; + case 4: // IS_LEADER + return IS_LEADER; + case 5: // VERSION + return VERSION; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + private static final int __PORT_ISSET_ID = 0; + private static final int __UPTIMESECS_ISSET_ID = 1; + private static final int __ISLEADER_ISSET_ID = 2; + private BitSet __isset_bit_vector = new BitSet(3); + + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.HOST, new org.apache.thrift.meta_data.FieldMetaData("host", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.PORT, new org.apache.thrift.meta_data.FieldMetaData("port", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); + tmpMap.put(_Fields.UPTIME_SECS, new org.apache.thrift.meta_data.FieldMetaData("uptimeSecs", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); + tmpMap.put(_Fields.IS_LEADER, new org.apache.thrift.meta_data.FieldMetaData("isLeader", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); + tmpMap.put(_Fields.VERSION, new org.apache.thrift.meta_data.FieldMetaData("version", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(NimbusSummary.class, metaDataMap); + } + + public NimbusSummary() { + } + + public NimbusSummary( + String host, + int port, + int uptimeSecs, + boolean isLeader, + String version) + { + this(); + this.host = host; + this.port = port; + set_port_isSet(true); + this.uptimeSecs = uptimeSecs; + set_uptimeSecs_isSet(true); + this.isLeader = isLeader; + set_isLeader_isSet(true); + this.version = version; + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public NimbusSummary(NimbusSummary other) { + __isset_bit_vector.clear(); + __isset_bit_vector.or(other.__isset_bit_vector); + if (other.is_set_host()) { + this.host = other.host; + } + this.port = other.port; + this.uptimeSecs = other.uptimeSecs; + this.isLeader = other.isLeader; + if (other.is_set_version()) { + this.version = other.version; + } + } + + public NimbusSummary deepCopy() { + return new NimbusSummary(this); + } + + @Override + public void clear() { + this.host = null; + set_port_isSet(false); + this.port = 0; + set_uptimeSecs_isSet(false); + this.uptimeSecs = 0; + set_isLeader_isSet(false); + this.isLeader = false; + this.version = null; + } + + public String get_host() { + return this.host; + } + + public void set_host(String host) { + this.host = host; + } + + public void unset_host() { + this.host = null; + } + + /** Returns true if field host is set (has been assigned a value) and false otherwise */ + public boolean is_set_host() { + return this.host != null; + } + + public void set_host_isSet(boolean value) { + if (!value) { + this.host = null; + } + } + + public int get_port() { + return this.port; + } + + public void set_port(int port) { + this.port = port; + set_port_isSet(true); + } + + public void unset_port() { + __isset_bit_vector.clear(__PORT_ISSET_ID); + } + + /** Returns true if field port is set (has been assigned a value) and false otherwise */ + public boolean is_set_port() { + return __isset_bit_vector.get(__PORT_ISSET_ID); + } + + public void set_port_isSet(boolean value) { + __isset_bit_vector.set(__PORT_ISSET_ID, value); + } + + public int get_uptimeSecs() { + return this.uptimeSecs; + } + + public void set_uptimeSecs(int uptimeSecs) { + this.uptimeSecs = uptimeSecs; + set_uptimeSecs_isSet(true); + } + + public void unset_uptimeSecs() { + __isset_bit_vector.clear(__UPTIMESECS_ISSET_ID); + } + + /** Returns true if field uptimeSecs is set (has been assigned a value) and false otherwise */ + public boolean is_set_uptimeSecs() { + return __isset_bit_vector.get(__UPTIMESECS_ISSET_ID); + } + + public void set_uptimeSecs_isSet(boolean value) { + __isset_bit_vector.set(__UPTIMESECS_ISSET_ID, value); + } + + public boolean is_isLeader() { + return this.isLeader; + } + + public void set_isLeader(boolean isLeader) { + this.isLeader = isLeader; + set_isLeader_isSet(true); + } + + public void unset_isLeader() { + __isset_bit_vector.clear(__ISLEADER_ISSET_ID); + } + + /** Returns true if field isLeader is set (has been assigned a value) and false otherwise */ + public boolean is_set_isLeader() { + return __isset_bit_vector.get(__ISLEADER_ISSET_ID); + } + + public void set_isLeader_isSet(boolean value) { + __isset_bit_vector.set(__ISLEADER_ISSET_ID, value); + } + + public String get_version() { + return this.version; + } + + public void set_version(String version) { + this.version = version; + } + + public void unset_version() { + this.version = null; + } + + /** Returns true if field version is set (has been assigned a value) and false otherwise */ + public boolean is_set_version() { + return this.version != null; + } + + public void set_version_isSet(boolean value) { + if (!value) { + this.version = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case HOST: + if (value == null) { + unset_host(); + } else { + set_host((String)value); + } + break; + + case PORT: + if (value == null) { + unset_port(); + } else { + set_port((Integer)value); + } + break; + + case UPTIME_SECS: + if (value == null) { + unset_uptimeSecs(); + } else { + set_uptimeSecs((Integer)value); + } + break; + + case IS_LEADER: + if (value == null) { + unset_isLeader(); + } else { + set_isLeader((Boolean)value); + } + break; + + case VERSION: + if (value == null) { + unset_version(); + } else { + set_version((String)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case HOST: + return get_host(); + + case PORT: + return Integer.valueOf(get_port()); + + case UPTIME_SECS: + return Integer.valueOf(get_uptimeSecs()); + + case IS_LEADER: + return Boolean.valueOf(is_isLeader()); + + case VERSION: + return get_version(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case HOST: + return is_set_host(); + case PORT: + return is_set_port(); + case UPTIME_SECS: + return is_set_uptimeSecs(); + case IS_LEADER: + return is_set_isLeader(); + case VERSION: + return is_set_version(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof NimbusSummary) + return this.equals((NimbusSummary)that); + return false; + } + + public boolean equals(NimbusSummary that) { + if (that == null) + return false; + + boolean this_present_host = true && this.is_set_host(); + boolean that_present_host = true && that.is_set_host(); + if (this_present_host || that_present_host) { + if (!(this_present_host && that_present_host)) + return false; + if (!this.host.equals(that.host)) + return false; + } + + boolean this_present_port = true; + boolean that_present_port = true; + if (this_present_port || that_present_port) { + if (!(this_present_port && that_present_port)) + return false; + if (this.port != that.port) + return false; + } + + boolean this_present_uptimeSecs = true; + boolean that_present_uptimeSecs = true; + if (this_present_uptimeSecs || that_present_uptimeSecs) { + if (!(this_present_uptimeSecs && that_present_uptimeSecs)) + return false; + if (this.uptimeSecs != that.uptimeSecs) + return false; + } + + boolean this_present_isLeader = true; + boolean that_present_isLeader = true; + if (this_present_isLeader || that_present_isLeader) { + if (!(this_present_isLeader && that_present_isLeader)) + return false; + if (this.isLeader != that.isLeader) + return false; + } + + boolean this_present_version = true && this.is_set_version(); + boolean that_present_version = true && that.is_set_version(); + if (this_present_version || that_present_version) { + if (!(this_present_version && that_present_version)) + return false; + if (!this.version.equals(that.version)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + HashCodeBuilder builder = new HashCodeBuilder(); + + boolean present_host = true && (is_set_host()); + builder.append(present_host); + if (present_host) + builder.append(host); + + boolean present_port = true; + builder.append(present_port); + if (present_port) + builder.append(port); + + boolean present_uptimeSecs = true; + builder.append(present_uptimeSecs); + if (present_uptimeSecs) + builder.append(uptimeSecs); + + boolean present_isLeader = true; + builder.append(present_isLeader); + if (present_isLeader) + builder.append(isLeader); + + boolean present_version = true && (is_set_version()); + builder.append(present_version); + if (present_version) + builder.append(version); + + return builder.toHashCode(); + } + + public int compareTo(NimbusSummary other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + NimbusSummary typedOther = (NimbusSummary)other; + + lastComparison = Boolean.valueOf(is_set_host()).compareTo(typedOther.is_set_host()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_host()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.host, typedOther.host); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(is_set_port()).compareTo(typedOther.is_set_port()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_port()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.port, typedOther.port); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(is_set_uptimeSecs()).compareTo(typedOther.is_set_uptimeSecs()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_uptimeSecs()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.uptimeSecs, typedOther.uptimeSecs); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(is_set_isLeader()).compareTo(typedOther.is_set_isLeader()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_isLeader()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.isLeader, typedOther.isLeader); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(is_set_version()).compareTo(typedOther.is_set_version()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_version()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.version, typedOther.version); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (field.id) { + case 1: // HOST + if (field.type == org.apache.thrift.protocol.TType.STRING) { + this.host = iprot.readString(); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 2: // PORT + if (field.type == org.apache.thrift.protocol.TType.I32) { + this.port = iprot.readI32(); + set_port_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 3: // UPTIME_SECS + if (field.type == org.apache.thrift.protocol.TType.I32) { + this.uptimeSecs = iprot.readI32(); + set_uptimeSecs_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 4: // IS_LEADER + if (field.type == org.apache.thrift.protocol.TType.BOOL) { + this.isLeader = iprot.readBool(); + set_isLeader_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 5: // VERSION + if (field.type == org.apache.thrift.protocol.TType.STRING) { + this.version = iprot.readString(); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (this.host != null) { + oprot.writeFieldBegin(HOST_FIELD_DESC); + oprot.writeString(this.host); + oprot.writeFieldEnd(); + } + oprot.writeFieldBegin(PORT_FIELD_DESC); + oprot.writeI32(this.port); + oprot.writeFieldEnd(); + oprot.writeFieldBegin(UPTIME_SECS_FIELD_DESC); + oprot.writeI32(this.uptimeSecs); + oprot.writeFieldEnd(); + oprot.writeFieldBegin(IS_LEADER_FIELD_DESC); + oprot.writeBool(this.isLeader); + oprot.writeFieldEnd(); + if (this.version != null) { + oprot.writeFieldBegin(VERSION_FIELD_DESC); + oprot.writeString(this.version); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("NimbusSummary("); + boolean first = true; + + sb.append("host:"); + if (this.host == null) { + sb.append("null"); + } else { + sb.append(this.host); + } + first = false; + if (!first) sb.append(", "); + sb.append("port:"); + sb.append(this.port); + first = false; + if (!first) sb.append(", "); + sb.append("uptimeSecs:"); + sb.append(this.uptimeSecs); + first = false; + if (!first) sb.append(", "); + sb.append("isLeader:"); + sb.append(this.isLeader); + first = false; + if (!first) sb.append(", "); + sb.append("version:"); + if (this.version == null) { + sb.append("null"); + } else { + sb.append(this.version); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + if (!is_set_host()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'host' is unset! Struct:" + toString()); + } + + if (!is_set_port()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'port' is unset! Struct:" + toString()); + } + + if (!is_set_uptimeSecs()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'uptimeSecs' is unset! Struct:" + toString()); + } + + if (!is_set_isLeader()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'isLeader' is unset! Struct:" + toString()); + } + + if (!is_set_version()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'version' is unset! Struct:" + toString()); + } + + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bit_vector = new BitSet(1); + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + +} + http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/py/storm/ttypes.py ---------------------------------------------------------------------- diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py index e4fb751..a5e155c 100644 --- a/storm-core/src/py/storm/ttypes.py +++ b/storm-core/src/py/storm/ttypes.py @@ -2354,28 +2354,150 @@ class SupervisorSummary: def __ne__(self, other): return not (self == other) +class NimbusSummary: + """ + Attributes: + - host + - port + - uptimeSecs + - isLeader + - version + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'host', None, None, ), # 1 + (2, TType.I32, 'port', None, None, ), # 2 + (3, TType.I32, 'uptimeSecs', None, None, ), # 3 + (4, TType.BOOL, 'isLeader', None, None, ), # 4 + (5, TType.STRING, 'version', None, None, ), # 5 + ) + + def __hash__(self): + return 0 + hash(self.host) + hash(self.port) + hash(self.uptimeSecs) + hash(self.isLeader) + hash(self.version) + + def __init__(self, host=None, port=None, uptimeSecs=None, isLeader=None, version=None,): + self.host = host + self.port = port + self.uptimeSecs = uptimeSecs + self.isLeader = isLeader + self.version = version + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.host = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I32: + self.port = iprot.readI32(); + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I32: + self.uptimeSecs = iprot.readI32(); + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.BOOL: + self.isLeader = iprot.readBool(); + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.STRING: + self.version = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('NimbusSummary') + if self.host is not None: + oprot.writeFieldBegin('host', TType.STRING, 1) + oprot.writeString(self.host.encode('utf-8')) + oprot.writeFieldEnd() + if self.port is not None: + oprot.writeFieldBegin('port', TType.I32, 2) + oprot.writeI32(self.port) + oprot.writeFieldEnd() + if self.uptimeSecs is not None: + oprot.writeFieldBegin('uptimeSecs', TType.I32, 3) + oprot.writeI32(self.uptimeSecs) + oprot.writeFieldEnd() + if self.isLeader is not None: + oprot.writeFieldBegin('isLeader', TType.BOOL, 4) + oprot.writeBool(self.isLeader) + oprot.writeFieldEnd() + if self.version is not None: + oprot.writeFieldBegin('version', TType.STRING, 5) + oprot.writeString(self.version.encode('utf-8')) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.host is None: + raise TProtocol.TProtocolException(message='Required field host is unset!') + if self.port is None: + raise TProtocol.TProtocolException(message='Required field port is unset!') + if self.uptimeSecs is None: + raise TProtocol.TProtocolException(message='Required field uptimeSecs is unset!') + if self.isLeader is None: + raise TProtocol.TProtocolException(message='Required field isLeader is unset!') + if self.version is None: + raise TProtocol.TProtocolException(message='Required field version is unset!') + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + class ClusterSummary: """ Attributes: - supervisors - - nimbus_uptime_secs - topologies + - nimbuses """ thrift_spec = ( None, # 0 (1, TType.LIST, 'supervisors', (TType.STRUCT,(SupervisorSummary, SupervisorSummary.thrift_spec)), None, ), # 1 - (2, TType.I32, 'nimbus_uptime_secs', None, None, ), # 2 + None, # 2 (3, TType.LIST, 'topologies', (TType.STRUCT,(TopologySummary, TopologySummary.thrift_spec)), None, ), # 3 + (4, TType.LIST, 'nimbuses', (TType.STRUCT,(NimbusSummary, NimbusSummary.thrift_spec)), None, ), # 4 ) def __hash__(self): - return 0 + hash(self.supervisors) + hash(self.nimbus_uptime_secs) + hash(self.topologies) + return 0 + hash(self.supervisors) + hash(self.topologies) + hash(self.nimbuses) - def __init__(self, supervisors=None, nimbus_uptime_secs=None, topologies=None,): + def __init__(self, supervisors=None, topologies=None, nimbuses=None,): self.supervisors = supervisors - self.nimbus_uptime_secs = nimbus_uptime_secs self.topologies = topologies + self.nimbuses = nimbuses def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -2397,11 +2519,6 @@ class ClusterSummary: iprot.readListEnd() else: iprot.skip(ftype) - elif fid == 2: - if ftype == TType.I32: - self.nimbus_uptime_secs = iprot.readI32(); - else: - iprot.skip(ftype) elif fid == 3: if ftype == TType.LIST: self.topologies = [] @@ -2413,6 +2530,17 @@ class ClusterSummary: iprot.readListEnd() else: iprot.skip(ftype) + elif fid == 4: + if ftype == TType.LIST: + self.nimbuses = [] + (_etype81, _size78) = iprot.readListBegin() + for _i82 in xrange(_size78): + _elem83 = NimbusSummary() + _elem83.read(iprot) + self.nimbuses.append(_elem83) + iprot.readListEnd() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -2426,19 +2554,22 @@ class ClusterSummary: if self.supervisors is not None: oprot.writeFieldBegin('supervisors', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.supervisors)) - for iter78 in self.supervisors: - iter78.write(oprot) + for iter84 in self.supervisors: + iter84.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() - if self.nimbus_uptime_secs is not None: - oprot.writeFieldBegin('nimbus_uptime_secs', TType.I32, 2) - oprot.writeI32(self.nimbus_uptime_secs) - oprot.writeFieldEnd() if self.topologies is not None: oprot.writeFieldBegin('topologies', TType.LIST, 3) oprot.writeListBegin(TType.STRUCT, len(self.topologies)) - for iter79 in self.topologies: - iter79.write(oprot) + for iter85 in self.topologies: + iter85.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + if self.nimbuses is not None: + oprot.writeFieldBegin('nimbuses', TType.LIST, 4) + oprot.writeListBegin(TType.STRUCT, len(self.nimbuses)) + for iter86 in self.nimbuses: + iter86.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -2447,10 +2578,10 @@ class ClusterSummary: def validate(self): if self.supervisors is None: raise TProtocol.TProtocolException(message='Required field supervisors is unset!') - if self.nimbus_uptime_secs is None: - raise TProtocol.TProtocolException(message='Required field nimbus_uptime_secs is unset!') if self.topologies is None: raise TProtocol.TProtocolException(message='Required field topologies is unset!') + if self.nimbuses is None: + raise TProtocol.TProtocolException(message='Required field nimbuses is unset!') return @@ -2609,90 +2740,90 @@ class BoltStats: if fid == 1: if ftype == TType.MAP: self.acked = {} - (_ktype81, _vtype82, _size80 ) = iprot.readMapBegin() - for _i84 in xrange(_size80): - _key85 = iprot.readString().decode('utf-8') - _val86 = {} - (_ktype88, _vtype89, _size87 ) = iprot.readMapBegin() - for _i91 in xrange(_size87): - _key92 = GlobalStreamId() - _key92.read(iprot) - _val93 = iprot.readI64(); - _val86[_key92] = _val93 + (_ktype88, _vtype89, _size87 ) = iprot.readMapBegin() + for _i91 in xrange(_size87): + _key92 = iprot.readString().decode('utf-8') + _val93 = {} + (_ktype95, _vtype96, _size94 ) = iprot.readMapBegin() + for _i98 in xrange(_size94): + _key99 = GlobalStreamId() + _key99.read(iprot) + _val100 = iprot.readI64(); + _val93[_key99] = _val100 iprot.readMapEnd() - self.acked[_key85] = _val86 + self.acked[_key92] = _val93 iprot.readMapEnd() else: iprot.skip(ftype) elif fid == 2: if ftype == TType.MAP: self.failed = {} - (_ktype95, _vtype96, _size94 ) = iprot.readMapBegin() - for _i98 in xrange(_size94): - _key99 = iprot.readString().decode('utf-8') - _val100 = {} - (_ktype102, _vtype103, _size101 ) = iprot.readMapBegin() - for _i105 in xrange(_size101): - _key106 = GlobalStreamId() - _key106.read(iprot) - _val107 = iprot.readI64(); - _val100[_key106] = _val107 + (_ktype102, _vtype103, _size101 ) = iprot.readMapBegin() + for _i105 in xrange(_size101): + _key106 = iprot.readString().decode('utf-8') + _val107 = {} + (_ktype109, _vtype110, _size108 ) = iprot.readMapBegin() + for _i112 in xrange(_size108): + _key113 = GlobalStreamId() + _key113.read(iprot) + _val114 = iprot.readI64(); + _val107[_key113] = _val114 iprot.readMapEnd() - self.failed[_key99] = _val100 + self.failed[_key106] = _val107 iprot.readMapEnd() else: iprot.skip(ftype) elif fid == 3: if ftype == TType.MAP: self.process_ms_avg = {} - (_ktype109, _vtype110, _size108 ) = iprot.readMapBegin() - for _i112 in xrange(_size108): - _key113 = iprot.readString().decode('utf-8') - _val114 = {} - (_ktype116, _vtype117, _size115 ) = iprot.readMapBegin() - for _i119 in xrange(_size115): - _key120 = GlobalStreamId() - _key120.read(iprot) - _val121 = iprot.readDouble(); - _val114[_key120] = _val121 + (_ktype116, _vtype117, _size115 ) = iprot.readMapBegin() + for _i119 in xrange(_size115): + _key120 = iprot.readString().decode('utf-8') + _val121 = {} + (_ktype123, _vtype124, _size122 ) = iprot.readMapBegin() + for _i126 in xrange(_size122): + _key127 = GlobalStreamId() + _key127.read(iprot) + _val128 = iprot.readDouble(); + _val121[_key127] = _val128 iprot.readMapEnd() - self.process_ms_avg[_key113] = _val114 + self.process_ms_avg[_key120] = _val121 iprot.readMapEnd() else: iprot.skip(ftype) elif fid == 4: if ftype == TType.MAP: self.executed = {} - (_ktype123, _vtype124, _size122 ) = iprot.readMapBegin() - for _i126 in xrange(_size122): - _key127 = iprot.readString().decode('utf-8') - _val128 = {} - (_ktype130, _vtype131, _size129 ) = iprot.readMapBegin() - for _i133 in xrange(_size129): - _key134 = GlobalStreamId() - _key134.read(iprot) - _val135 = iprot.readI64(); - _val128[_key134] = _val135 + (_ktype130, _vtype131, _size129 ) = iprot.readMapBegin() + for _i133 in xrange(_size129): + _key134 = iprot.readString().decode('utf-8') + _val135 = {} + (_ktype137, _vtype138, _size136 ) = iprot.readMapBegin() + for _i140 in xrange(_size136): + _key141 = GlobalStreamId() + _key141.read(iprot) + _val142 = iprot.readI64(); + _val135[_key141] = _val142 iprot.readMapEnd() - self.executed[_key127] = _val128 + self.executed[_key134] = _val135 iprot.readMapEnd() else: iprot.skip(ftype) elif fid == 5: if ftype == TType.MAP: self.execute_ms_avg = {} - (_ktype137, _vtype138, _size136 ) = iprot.readMapBegin() - for _i140 in xrange(_size136): - _key141 = iprot.readString().decode('utf-8') - _val142 = {} - (_ktype144, _vtype145, _size143 ) = iprot.readMapBegin() - for _i147 in xrange(_size143): - _key148 = GlobalStreamId() - _key148.read(iprot) - _val149 = iprot.readDouble(); - _val142[_key148] = _val149 + (_ktype144, _vtype145, _size143 ) = iprot.readMapBegin() + for _i147 in xrange(_size143): + _key148 = iprot.readString().decode('utf-8') + _val149 = {} + (_ktype151, _vtype152, _size150 ) = iprot.readMapBegin() + for _i154 in xrange(_size150): + _key155 = GlobalStreamId() + _key155.read(iprot) + _val156 = iprot.readDouble(); + _val149[_key155] = _val156 iprot.readMapEnd() - self.execute_ms_avg[_key141] = _val142 + self.execute_ms_avg[_key148] = _val149 iprot.readMapEnd() else: iprot.skip(ftype) @@ -2709,60 +2840,60 @@ class BoltStats: if self.acked is not None: oprot.writeFieldBegin('acked', TType.MAP, 1) oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.acked)) - for kiter150,viter151 in self.acked.items(): - oprot.writeString(kiter150.encode('utf-8')) - oprot.writeMapBegin(TType.STRUCT, TType.I64, len(viter151)) - for kiter152,viter153 in viter151.items(): - kiter152.write(oprot) - oprot.writeI64(viter153) + for kiter157,viter158 in self.acked.items(): + oprot.writeString(kiter157.encode('utf-8')) + oprot.writeMapBegin(TType.STRUCT, TType.I64, len(viter158)) + for kiter159,viter160 in viter158.items(): + kiter159.write(oprot) + oprot.writeI64(viter160) oprot.writeMapEnd() oprot.writeMapEnd() oprot.writeFieldEnd() if self.failed is not None: oprot.writeFieldBegin('failed', TType.MAP, 2) oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.failed)) - for kiter154,viter155 in self.failed.items(): - oprot.writeString(kiter154.encode('utf-8')) - oprot.writeMapBegin(TType.STRUCT, TType.I64, len(viter155)) - for kiter156,viter157 in viter155.items(): - kiter156.write(oprot) - oprot.writeI64(viter157) + for kiter161,viter162 in self.failed.items(): + oprot.writeString(kiter161.encode('utf-8')) + oprot.writeMapBegin(TType.STRUCT, TType.I64, len(viter162)) + for kiter163,viter164 in viter162.items(): + kiter163.write(oprot) + oprot.writeI64(viter164) oprot.writeMapEnd() oprot.writeMapEnd() oprot.writeFieldEnd() if self.process_ms_avg is not None: oprot.writeFieldBegin('process_ms_avg', TType.MAP, 3) oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.process_ms_avg)) - for kiter158,viter159 in self.process_ms_avg.items(): - oprot.writeString(kiter158.encode('utf-8')) - oprot.writeMapBegin(TType.STRUCT, TType.DOUBLE, len(viter159)) - for kiter160,viter161 in viter159.items(): - kiter160.write(oprot) - oprot.writeDouble(viter161) + for kiter165,viter166 in self.process_ms_avg.items(): + oprot.writeString(kiter165.encode('utf-8')) + oprot.writeMapBegin(TType.STRUCT, TType.DOUBLE, len(viter166)) + for kiter167,viter168 in viter166.items(): + kiter167.write(oprot) + oprot.writeDouble(viter168) oprot.writeMapEnd() oprot.writeMapEnd() oprot.writeFieldEnd() if self.executed is not None: oprot.writeFieldBegin('executed', TType.MAP, 4) oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.executed)) - for kiter162,viter163 in self.executed.items(): - oprot.writeString(kiter162.encode('utf-8')) - oprot.writeMapBegin(TType.STRUCT, TType.I64, len(viter163)) - for kiter164,viter165 in viter163.items(): - kiter164.write(oprot) - oprot.writeI64(viter165) + for kiter169,viter170 in self.executed.items(): + oprot.writeString(kiter169.encode('utf-8')) + oprot.writeMapBegin(TType.STRUCT, TType.I64, len(viter170)) + for kiter171,viter172 in viter170.items(): + kiter171.write(oprot) + oprot.writeI64(viter172) oprot.writeMapEnd() oprot.writeMapEnd() oprot.writeFieldEnd() if self.execute_ms_avg is not None: oprot.writeFieldBegin('execute_ms_avg', TType.MAP, 5) oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.execute_ms_avg)) - for kiter166,viter167 in self.execute_ms_avg.items(): - oprot.writeString(kiter166.encode('utf-8')) - oprot.writeMapBegin(TType.STRUCT, TType.DOUBLE, len(viter167)) - for kiter168,viter169 in viter167.items(): - kiter168.write(oprot) - oprot.writeDouble(viter169) + for kiter173,viter174 in self.execute_ms_avg.items(): + oprot.writeString(kiter173.encode('utf-8')) + oprot.writeMapBegin(TType.STRUCT, TType.DOUBLE, len(viter174)) + for kiter175,viter176 in viter174.items(): + kiter175.write(oprot) + oprot.writeDouble(viter176) oprot.writeMapEnd() oprot.writeMapEnd() oprot.writeFieldEnd() @@ -2829,51 +2960,51 @@ class SpoutStats: if fid == 1: if ftype == TType.MAP: self.acked = {} - (_ktype171, _vtype172, _size170 ) = iprot.readMapBegin() - for _i174 in xrange(_size170): - _key175 = iprot.readString().decode('utf-8') - _val176 = {} - (_ktype178, _vtype179, _size177 ) = iprot.readMapBegin() - for _i181 in xrange(_size177): - _key182 = iprot.readString().decode('utf-8') - _val183 = iprot.readI64(); - _val176[_key182] = _val183 + (_ktype178, _vtype179, _size177 ) = iprot.readMapBegin() + for _i181 in xrange(_size177): + _key182 = iprot.readString().decode('utf-8') + _val183 = {} + (_ktype185, _vtype186, _size184 ) = iprot.readMapBegin() + for _i188 in xrange(_size184): + _key189 = iprot.readString().decode('utf-8') + _val190 = iprot.readI64(); + _val183[_key189] = _val190 iprot.readMapEnd() - self.acked[_key175] = _val176 + self.acked[_key182] = _val183 iprot.readMapEnd() else: iprot.skip(ftype) elif fid == 2: if ftype == TType.MAP: self.failed = {} - (_ktype185, _vtype186, _size184 ) = iprot.readMapBegin() - for _i188 in xrange(_size184): - _key189 = iprot.readString().decode('utf-8') - _val190 = {} - (_ktype192, _vtype193, _size191 ) = iprot.readMapBegin() - for _i195 in xrange(_size191): - _key196 = iprot.readString().decode('utf-8') - _val197 = iprot.readI64(); - _val190[_key196] = _val197 + (_ktype192, _vtype193, _size191 ) = iprot.readMapBegin() + for _i195 in xrange(_size191): + _key196 = iprot.readString().decode('utf-8') + _val197 = {} + (_ktype199, _vtype200, _size198 ) = iprot.readMapBegin() + for _i202 in xrange(_size198): + _key203 = iprot.readString().decode('utf-8') + _val204 = iprot.readI64(); + _val197[_key203] = _val204 iprot.readMapEnd() - self.failed[_key189] = _val190 + self.failed[_key196] = _val197 iprot.readMapEnd() else: iprot.skip(ftype) elif fid == 3: if ftype == TType.MAP: self.complete_ms_avg = {} - (_ktype199, _vtype200, _size198 ) = iprot.readMapBegin() - for _i202 in xrange(_size198): - _key203 = iprot.readString().decode('utf-8') - _val204 = {} - (_ktype206, _vtype207, _size205 ) = iprot.readMapBegin() - for _i209 in xrange(_size205): - _key210 = iprot.readString().decode('utf-8') - _val211 = iprot.readDouble(); - _val204[_key210] = _val211 + (_ktype206, _vtype207, _size205 ) = iprot.readMapBegin() + for _i209 in xrange(_size205): + _key210 = iprot.readString().decode('utf-8') + _val211 = {} + (_ktype213, _vtype214, _size212 ) = iprot.readMapBegin() + for _i216 in xrange(_size212): + _key217 = iprot.readString().decode('utf-8') + _val218 = iprot.readDouble(); + _val211[_key217] = _val218 iprot.readMapEnd() - self.complete_ms_avg[_key203] = _val204 + self.complete_ms_avg[_key210] = _val211 iprot.readMapEnd() else: iprot.skip(ftype) @@ -2890,36 +3021,36 @@ class SpoutStats: if self.acked is not None: oprot.writeFieldBegin('acked', TType.MAP, 1) oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.acked)) - for kiter212,viter213 in self.acked.items(): - oprot.writeString(kiter212.encode('utf-8')) - oprot.writeMapBegin(TType.STRING, TType.I64, len(viter213)) - for kiter214,viter215 in viter213.items(): - oprot.writeString(kiter214.encode('utf-8')) - oprot.writeI64(viter215) + for kiter219,viter220 in self.acked.items(): + oprot.writeString(kiter219.encode('utf-8')) + oprot.writeMapBegin(TType.STRING, TType.I64, len(viter220)) + for kiter221,viter222 in viter220.items(): + oprot.writeString(kiter221.encode('utf-8')) + oprot.writeI64(viter222) oprot.writeMapEnd() oprot.writeMapEnd() oprot.writeFieldEnd() if self.failed is not None: oprot.writeFieldBegin('failed', TType.MAP, 2) oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.failed)) - for kiter216,viter217 in self.failed.items(): - oprot.writeString(kiter216.encode('utf-8')) - oprot.writeMapBegin(TType.STRING, TType.I64, len(viter217)) - for kiter218,viter219 in viter217.items(): - oprot.writeString(kiter218.encode('utf-8')) - oprot.writeI64(viter219) + for kiter223,viter224 in self.failed.items(): + oprot.writeString(kiter223.encode('utf-8')) + oprot.writeMapBegin(TType.STRING, TType.I64, len(viter224)) + for kiter225,viter226 in viter224.items(): + oprot.writeString(kiter225.encode('utf-8')) + oprot.writeI64(viter226) oprot.writeMapEnd() oprot.writeMapEnd() oprot.writeFieldEnd() if self.complete_ms_avg is not None: oprot.writeFieldBegin('complete_ms_avg', TType.MAP, 3) oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.complete_ms_avg)) - for kiter220,viter221 in self.complete_ms_avg.items(): - oprot.writeString(kiter220.encode('utf-8')) - oprot.writeMapBegin(TType.STRING, TType.DOUBLE, len(viter221)) - for kiter222,viter223 in viter221.items(): - oprot.writeString(kiter222.encode('utf-8')) - oprot.writeDouble(viter223) + for kiter227,viter228 in self.complete_ms_avg.items(): + oprot.writeString(kiter227.encode('utf-8')) + oprot.writeMapBegin(TType.STRING, TType.DOUBLE, len(viter228)) + for kiter229,viter230 in viter228.items(): + oprot.writeString(kiter229.encode('utf-8')) + oprot.writeDouble(viter230) oprot.writeMapEnd() oprot.writeMapEnd() oprot.writeFieldEnd() @@ -3059,34 +3190,34 @@ class ExecutorStats: if fid == 1: if ftype == TType.MAP: self.emitted = {} - (_ktype225, _vtype226, _size224 ) = iprot.readMapBegin() - for _i228 in xrange(_size224): - _key229 = iprot.readString().decode('utf-8') - _val230 = {} - (_ktype232, _vtype233, _size231 ) = iprot.readMapBegin() - for _i235 in xrange(_size231): - _key236 = iprot.readString().decode('utf-8') - _val237 = iprot.readI64(); - _val230[_key236] = _val237 + (_ktype232, _vtype233, _size231 ) = iprot.readMapBegin() + for _i235 in xrange(_size231): + _key236 = iprot.readString().decode('utf-8') + _val237 = {} + (_ktype239, _vtype240, _size238 ) = iprot.readMapBegin() + for _i242 in xrange(_size238): + _key243 = iprot.readString().decode('utf-8') + _val244 = iprot.readI64(); + _val237[_key243] = _val244 iprot.readMapEnd() - self.emitted[_key229] = _val230 + self.emitted[_key236] = _val237 iprot.readMapEnd() else: iprot.skip(ftype) elif fid == 2: if ftype == TType.MAP: self.transferred = {} - (_ktype239, _vtype240, _size238 ) = iprot.readMapBegin() - for _i242 in xrange(_size238): - _key243 = iprot.readString().decode('utf-8') - _val244 = {} - (_ktype246, _vtype247, _size245 ) = iprot.readMapBegin() - for _i249 in xrange(_size245): - _key250 = iprot.readString().decode('utf-8') - _val251 = iprot.readI64(); - _val244[_key250] = _val251 + (_ktype246, _vtype247, _size245 ) = iprot.readMapBegin() + for _i249 in xrange(_size245): + _key250 = iprot.readString().decode('utf-8') + _val251 = {} + (_ktype253, _vtype254, _size252 ) = iprot.readMapBegin() + for _i256 in xrange(_size252): + _key257 = iprot.readString().decode('utf-8') + _val258 = iprot.readI64(); + _val251[_key257] = _val258 iprot.readMapEnd() - self.transferred[_key243] = _val244 + self.transferred[_key250] = _val251 iprot.readMapEnd() else: iprot.skip(ftype) @@ -3109,24 +3240,24 @@ class ExecutorStats: if self.emitted is not None: oprot.writeFieldBegin('emitted', TType.MAP, 1) oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.emitted)) - for kiter252,viter253 in self.emitted.items(): - oprot.writeString(kiter252.encode('utf-8')) - oprot.writeMapBegin(TType.STRING, TType.I64, len(viter253)) - for kiter254,viter255 in viter253.items(): - oprot.writeString(kiter254.encode('utf-8')) - oprot.writeI64(viter255) + for kiter259,viter260 in self.emitted.items(): + oprot.writeString(kiter259.encode('utf-8')) + oprot.writeMapBegin(TType.STRING, TType.I64, len(viter260)) + for kiter261,viter262 in viter260.items(): + oprot.writeString(kiter261.encode('utf-8')) + oprot.writeI64(viter262) oprot.writeMapEnd() oprot.writeMapEnd() oprot.writeFieldEnd() if self.transferred is not None: oprot.writeFieldBegin('transferred', TType.MAP, 2) oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.transferred)) - for kiter256,viter257 in self.transferred.items(): - oprot.writeString(kiter256.encode('utf-8')) - oprot.writeMapBegin(TType.STRING, TType.I64, len(viter257)) - for kiter258,viter259 in viter257.items(): - oprot.writeString(kiter258.encode('utf-8')) - oprot.writeI64(viter259) + for kiter263,viter264 in self.transferred.items(): + oprot.writeString(kiter263.encode('utf-8')) + oprot.writeMapBegin(TType.STRING, TType.I64, len(viter264)) + for kiter265,viter266 in viter264.items(): + oprot.writeString(kiter265.encode('utf-8')) + oprot.writeI64(viter266) oprot.writeMapEnd() oprot.writeMapEnd() oprot.writeFieldEnd() @@ -3947,11 +4078,11 @@ class TopologyInfo: elif fid == 4: if ftype == TType.LIST: self.executors = [] - (_etype263, _size260) = iprot.readListBegin() - for _i264 in xrange(_size260): - _elem265 = ExecutorSummary() - _elem265.read(iprot) - self.executors.append(_elem265) + (_etype270, _size267) = iprot.readListBegin() + for _i271 in xrange(_size267): + _elem272 = ExecutorSummary() + _elem272.read(iprot) + self.executors.append(_elem272) iprot.readListEnd() else: iprot.skip(ftype) @@ -3963,17 +4094,17 @@ class TopologyInfo: elif fid == 6: if ftype == TType.MAP: self.errors = {} - (_ktype267, _vtype268, _size266 ) = iprot.readMapBegin() - for _i270 in xrange(_size266): - _key271 = iprot.readString().decode('utf-8') - _val272 = [] - (_etype276, _size273) = iprot.readListBegin() - for _i277 in xrange(_size273): - _elem278 = ErrorInfo() - _elem278.read(iprot) - _val272.append(_elem278) + (_ktype274, _vtype275, _size273 ) = iprot.readMapBegin() + for _i277 in xrange(_size273): + _key278 = iprot.readString().decode('utf-8') + _val279 = [] + (_etype283, _size280) = iprot.readListBegin() + for _i284 in xrange(_size280): + _elem285 = ErrorInfo() + _elem285.read(iprot) + _val279.append(_elem285) iprot.readListEnd() - self.errors[_key271] = _val272 + self.errors[_key278] = _val279 iprot.readMapEnd() else: iprot.skip(ftype) @@ -4017,8 +4148,8 @@ class TopologyInfo: if self.executors is not None: oprot.writeFieldBegin('executors', TType.LIST, 4) oprot.writeListBegin(TType.STRUCT, len(self.executors)) - for iter279 in self.executors: - iter279.write(oprot) + for iter286 in self.executors: + iter286.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() if self.status is not None: @@ -4028,11 +4159,11 @@ class TopologyInfo: if self.errors is not None: oprot.writeFieldBegin('errors', TType.MAP, 6) oprot.writeMapBegin(TType.STRING, TType.LIST, len(self.errors)) - for kiter280,viter281 in self.errors.items(): - oprot.writeString(kiter280.encode('utf-8')) - oprot.writeListBegin(TType.STRUCT, len(viter281)) - for iter282 in viter281: - iter282.write(oprot) + for kiter287,viter288 in self.errors.items(): + oprot.writeString(kiter287.encode('utf-8')) + oprot.writeListBegin(TType.STRUCT, len(viter288)) + for iter289 in viter288: + iter289.write(oprot) oprot.writeListEnd() oprot.writeMapEnd() oprot.writeFieldEnd() @@ -4186,11 +4317,11 @@ class RebalanceOptions: elif fid == 3: if ftype == TType.MAP: self.num_executors = {} - (_ktype284, _vtype285, _size283 ) = iprot.readMapBegin() - for _i287 in xrange(_size283): - _key288 = iprot.readString().decode('utf-8') - _val289 = iprot.readI32(); - self.num_executors[_key288] = _val289 + (_ktype291, _vtype292, _size290 ) = iprot.readMapBegin() + for _i294 in xrange(_size290): + _key295 = iprot.readString().decode('utf-8') + _val296 = iprot.readI32(); + self.num_executors[_key295] = _val296 iprot.readMapEnd() else: iprot.skip(ftype) @@ -4215,9 +4346,9 @@ class RebalanceOptions: if self.num_executors is not None: oprot.writeFieldBegin('num_executors', TType.MAP, 3) oprot.writeMapBegin(TType.STRING, TType.I32, len(self.num_executors)) - for kiter290,viter291 in self.num_executors.items(): - oprot.writeString(kiter290.encode('utf-8')) - oprot.writeI32(viter291) + for kiter297,viter298 in self.num_executors.items(): + oprot.writeString(kiter297.encode('utf-8')) + oprot.writeI32(viter298) oprot.writeMapEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -4267,11 +4398,11 @@ class Credentials: if fid == 1: if ftype == TType.MAP: self.creds = {} - (_ktype293, _vtype294, _size292 ) = iprot.readMapBegin() - for _i296 in xrange(_size292): - _key297 = iprot.readString().decode('utf-8') - _val298 = iprot.readString().decode('utf-8') - self.creds[_key297] = _val298 + (_ktype300, _vtype301, _size299 ) = iprot.readMapBegin() + for _i303 in xrange(_size299): + _key304 = iprot.readString().decode('utf-8') + _val305 = iprot.readString().decode('utf-8') + self.creds[_key304] = _val305 iprot.readMapEnd() else: iprot.skip(ftype) @@ -4288,9 +4419,9 @@ class Credentials: if self.creds is not None: oprot.writeFieldBegin('creds', TType.MAP, 1) oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.creds)) - for kiter299,viter300 in self.creds.items(): - oprot.writeString(kiter299.encode('utf-8')) - oprot.writeString(viter300.encode('utf-8')) + for kiter306,viter307 in self.creds.items(): + oprot.writeString(kiter306.encode('utf-8')) + oprot.writeString(viter307.encode('utf-8')) oprot.writeMapEnd() oprot.writeFieldEnd() oprot.writeFieldStop()
