upmerge from master
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/39ea23cd Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/39ea23cd Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/39ea23cd Branch: refs/heads/master Commit: 39ea23cdbbf64b3844205ce759701ec78d688c4a Parents: 4e0ff2f 96f81d7 Author: å«ä¹ <[email protected]> Authored: Sat Mar 5 20:02:42 2016 +0800 Committer: å«ä¹ <[email protected]> Committed: Sat Mar 5 20:02:42 2016 +0800 ---------------------------------------------------------------------- CHANGELOG.md | 7 + .../src/clj/org/apache/storm/MockAutoCred.clj | 58 -- .../clj/org/apache/storm/daemon/supervisor.clj | 15 + .../storm/cluster/StormClusterStateImpl.java | 7 +- .../storm/daemon/metrics/MetricsUtils.java | 2 +- .../jvm/org/apache/storm/drpc/DRPCSpout.java | 2 + .../apache/storm/security/auth/AuthUtils.java | 40 + .../storm/security/auth/kerberos/AutoTGT.java | 64 +- .../auth/kerberos/AutoTGTKrb5LoginModule.java | 8 +- .../apache/storm/stats/BoltExecutorStats.java | 6 +- .../jvm/org/apache/storm/stats/CommonStats.java | 12 +- .../apache/storm/stats/SpoutExecutorStats.java | 5 +- .../jvm/org/apache/storm/stats/StatsUtil.java | 781 ++++++++++--------- .../jvm/org/apache/storm/utils/ConfigUtils.java | 10 + .../test/clj/org/apache/storm/nimbus_test.clj | 10 +- .../security/auth/auto_login_module_test.clj | 24 +- .../clj/org/apache/storm/supervisor_test.clj | 6 + .../test/jvm/org/apache/storm/MockAutoCred.java | 75 ++ 18 files changed, 623 insertions(+), 509 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/39ea23cd/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java index f6dad09,0000000..d8c7f06 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java +++ b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java @@@ -1,118 -1,0 +1,114 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.stats; + +import com.google.common.collect.Lists; +import java.util.HashMap; +import java.util.List; +import java.util.Map; - import org.apache.storm.generated.BoltStats; - import org.apache.storm.generated.ExecutorSpecificStats; - import org.apache.storm.generated.ExecutorStats; - import org.apache.storm.generated.SpoutStats; +import org.apache.storm.metric.internal.MultiCountStatAndMetric; +import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; + +@SuppressWarnings("unchecked") +public class BoltExecutorStats extends CommonStats { + + public static final String ACKED = "acked"; + public static final String FAILED = "failed"; + public static final String EXECUTED = "executed"; + public static final String PROCESS_LATENCIES = "process-latencies"; + public static final String EXECUTE_LATENCIES = "execute-latencies"; + + public static final String[] BOLT_FIELDS = {ACKED, FAILED, EXECUTED, PROCESS_LATENCIES, EXECUTE_LATENCIES}; + + public BoltExecutorStats(int rate) { + super(rate); + + this.put(ACKED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); + this.put(FAILED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); + this.put(EXECUTED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); + this.put(PROCESS_LATENCIES, new MultiLatencyStatAndMetric(NUM_STAT_BUCKETS)); + this.put(EXECUTE_LATENCIES, new MultiLatencyStatAndMetric(NUM_STAT_BUCKETS)); + } + + public MultiCountStatAndMetric getAcked() { + return (MultiCountStatAndMetric) this.get(ACKED); + } + + public MultiCountStatAndMetric getFailed() { + return (MultiCountStatAndMetric) this.get(FAILED); + } + + public MultiCountStatAndMetric getExecuted() { + return (MultiCountStatAndMetric) this.get(EXECUTED); + } + + public MultiLatencyStatAndMetric getProcessLatencies() { + return (MultiLatencyStatAndMetric) this.get(PROCESS_LATENCIES); + } + + public MultiLatencyStatAndMetric getExecuteLatencies() { + return (MultiLatencyStatAndMetric) this.get(EXECUTE_LATENCIES); + } + + public void boltExecuteTuple(String component, String stream, long latencyMs) { + List key = Lists.newArrayList(component, stream); + this.getExecuted().incBy(key, this.rate); + this.getExecuteLatencies().record(key, latencyMs); + } + + public void boltAckedTuple(String component, String stream, long latencyMs) { + List key = Lists.newArrayList(component, stream); + this.getAcked().incBy(key, this.rate); + this.getProcessLatencies().record(key, latencyMs); + } + + public void boltFailedTuple(String component, String stream, long latencyMs) { + List key = Lists.newArrayList(component, stream); + this.getFailed().incBy(key, this.rate); + + } + + public Map renderStats() { + cleanupStats(); + Map ret = new HashMap(); + ret.putAll(valueStats(CommonStats.COMMON_FIELDS)); + ret.putAll(valueStats(BoltExecutorStats.BOLT_FIELDS)); - StatsUtil.putRawKV(ret, StatsUtil.TYPE, StatsUtil.KW_BOLT); ++ StatsUtil.putKV(ret, StatsUtil.TYPE, StatsUtil.KW_BOLT); + + return ret; + } + +// public ExecutorStats renderStats() { +// cleanupStats(); +// +// ExecutorStats ret = new ExecutorStats(); +// ret.set_emitted(valueStat(EMITTED)); +// ret.set_transferred(valueStat(TRANSFERRED)); +// ret.set_rate(this.rate); +// +// BoltStats boltStats = new BoltStats( +// StatsUtil.windowSetConverter(valueStat(ACKED), StatsUtil.TO_GSID, StatsUtil.IDENTITY), +// StatsUtil.windowSetConverter(valueStat(FAILED), StatsUtil.TO_GSID, StatsUtil.IDENTITY), +// StatsUtil.windowSetConverter(valueStat(PROCESS_LATENCIES), StatsUtil.TO_GSID, StatsUtil.IDENTITY), +// StatsUtil.windowSetConverter(valueStat(EXECUTED), StatsUtil.TO_GSID, StatsUtil.IDENTITY), +// StatsUtil.windowSetConverter(valueStat(EXECUTE_LATENCIES), StatsUtil.TO_GSID, StatsUtil.IDENTITY)); +// ret.set_specific(ExecutorSpecificStats.bolt(boltStats)); +// +// return ret; +// } +} http://git-wip-us.apache.org/repos/asf/storm/blob/39ea23cd/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/stats/CommonStats.java index e386413,0000000..f7826f9 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java +++ b/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java @@@ -1,112 -1,0 +1,112 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.stats; + +import java.util.HashMap; +import java.util.Map; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.internal.MultiCountStatAndMetric; +import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; + +@SuppressWarnings("unchecked") +public class CommonStats { + public static final int NUM_STAT_BUCKETS = 20; + + public static final String RATE = "rate"; + + public static final String EMITTED = "emitted"; + public static final String TRANSFERRED = "transferred"; + public static final String[] COMMON_FIELDS = {EMITTED, TRANSFERRED}; + + protected final int rate; - protected final Map metricMap = new HashMap(); ++ protected final Map<String, IMetric> metricMap = new HashMap<>(); + + public CommonStats(int rate) { + this.rate = rate; + this.put(EMITTED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); + this.put(TRANSFERRED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); + } + + public int getRate() { + return this.rate; + } + + public MultiCountStatAndMetric getEmitted() { + return (MultiCountStatAndMetric) get(EMITTED); + } + + public MultiCountStatAndMetric getTransferred() { + return (MultiCountStatAndMetric) get(TRANSFERRED); + } + + public IMetric get(String field) { - return (IMetric) StatsUtil.getByKeyword(metricMap, field); ++ return (IMetric) StatsUtil.getByKey(metricMap, field); + } + + protected void put(String field, Object value) { - StatsUtil.putRawKV(metricMap, field, value); ++ StatsUtil.putKV(metricMap, field, value); + } + + public void emittedTuple(String stream) { + this.getEmitted().incBy(stream, this.rate); + } + + public void transferredTuples(String stream, int amount) { + this.getTransferred().incBy(stream, this.rate * amount); + } + + public void cleanupStats() { + for (Object imetric : this.metricMap.values()) { + cleanupStat((IMetric) imetric); + } + } + + private void cleanupStat(IMetric metric) { + if (metric instanceof MultiCountStatAndMetric) { + ((MultiCountStatAndMetric) metric).close(); + } else if (metric instanceof MultiLatencyStatAndMetric) { + ((MultiLatencyStatAndMetric) metric).close(); + } + } + + protected Map valueStats(String[] fields) { + Map ret = new HashMap(); + for (String field : fields) { + IMetric metric = this.get(field); + if (metric instanceof MultiCountStatAndMetric) { - StatsUtil.putRawKV(ret, field, ((MultiCountStatAndMetric) metric).getTimeCounts()); ++ StatsUtil.putKV(ret, field, ((MultiCountStatAndMetric) metric).getTimeCounts()); + } else if (metric instanceof MultiLatencyStatAndMetric) { - StatsUtil.putRawKV(ret, field, ((MultiLatencyStatAndMetric) metric).getTimeLatAvg()); ++ StatsUtil.putKV(ret, field, ((MultiLatencyStatAndMetric) metric).getTimeLatAvg()); + } + } - StatsUtil.putRawKV(ret, CommonStats.RATE, this.getRate()); ++ StatsUtil.putKV(ret, CommonStats.RATE, this.getRate()); + + return ret; + } + + protected Map valueStat(String field) { + IMetric metric = this.get(field); + if (metric instanceof MultiCountStatAndMetric) { + return ((MultiCountStatAndMetric) metric).getTimeCounts(); + } else if (metric instanceof MultiLatencyStatAndMetric) { + return ((MultiLatencyStatAndMetric) metric).getTimeLatAvg(); + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/39ea23cd/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java index 918ae06,0000000..27c626e mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java +++ b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java @@@ -1,89 -1,0 +1,86 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.stats; + +import java.util.HashMap; +import java.util.Map; - import org.apache.storm.generated.ExecutorSpecificStats; - import org.apache.storm.generated.ExecutorStats; - import org.apache.storm.generated.SpoutStats; +import org.apache.storm.metric.internal.MultiCountStatAndMetric; +import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; + +@SuppressWarnings("unchecked") +public class SpoutExecutorStats extends CommonStats { + + public static final String ACKED = "acked"; + public static final String FAILED = "failed"; + public static final String COMPLETE_LATENCIES = "complete-latencies"; + + public static final String[] SPOUT_FIELDS = {ACKED, FAILED, COMPLETE_LATENCIES}; + + public SpoutExecutorStats(int rate) { + super(rate); + this.put(ACKED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); + this.put(FAILED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); + this.put(COMPLETE_LATENCIES, new MultiLatencyStatAndMetric(NUM_STAT_BUCKETS)); + } + + public MultiCountStatAndMetric getAcked() { + return (MultiCountStatAndMetric) this.get(ACKED); + } + + public MultiCountStatAndMetric getFailed() { + return (MultiCountStatAndMetric) this.get(FAILED); + } + + public MultiLatencyStatAndMetric getCompleteLatencies() { + return (MultiLatencyStatAndMetric) this.get(COMPLETE_LATENCIES); + } + + public void spoutAckedTuple(String stream, long latencyMs) { + this.getAcked().incBy(stream, this.rate); + this.getCompleteLatencies().record(stream, latencyMs); + } + + public void spoutFailedTuple(String stream, long latencyMs) { + this.getFailed().incBy(stream, this.rate); + } + + public Map renderStats() { + cleanupStats(); + Map ret = new HashMap(); + ret.putAll(valueStats(CommonStats.COMMON_FIELDS)); + ret.putAll(valueStats(SpoutExecutorStats.SPOUT_FIELDS)); - StatsUtil.putRawKV(ret, StatsUtil.TYPE, StatsUtil.KW_SPOUT); ++ StatsUtil.putKV(ret, StatsUtil.TYPE, StatsUtil.KW_SPOUT); + + return ret; + } + +// public ExecutorStats renderStats() { +// cleanupStats(); +// +// ExecutorStats ret = new ExecutorStats(); +// ret.set_emitted(valueStat(EMITTED)); +// ret.set_transferred(valueStat(TRANSFERRED)); +// ret.set_rate(this.rate); +// +// SpoutStats spoutStats = new SpoutStats( +// valueStat(ACKED), valueStat(FAILED), valueStat(COMPLETE_LATENCIES)); +// ret.set_specific(ExecutorSpecificStats.spout(spoutStats)); +// +// return ret; +// } +}
