http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/SimpleJStormMetric.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/SimpleJStormMetric.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/SimpleJStormMetric.java index 1d77089..7db0ed4 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/SimpleJStormMetric.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/SimpleJStormMetric.java @@ -17,80 +17,99 @@ */ package com.alibaba.jstorm.metric; -import java.util.Map; -import java.util.Map.Entry; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.jstorm.common.metric.Histogram; -import com.alibaba.jstorm.common.metric.MetricRegistry; -import com.alibaba.jstorm.common.metric.window.Metric; - -public class SimpleJStormMetric extends JStormMetrics implements Runnable{ - private static final Logger LOG = LoggerFactory.getLogger(SimpleJStormMetric.class); - - protected static MetricRegistry metrics = JStormMetrics.workerMetrics; - static { - Metric.setEnable(true); +import com.alibaba.jstorm.common.metric.*; +import com.codahale.metrics.Gauge; + +/** + * simplified metrics, only for worker metrics. all metrics are logged locally without reporting to TM or nimbus. + * + * @author Cody ([email protected]) + * @since 2.0.5 + */ +public class SimpleJStormMetric extends JStormMetrics { + private static final long serialVersionUID = 7468005641982249536L; + + protected static final AsmMetricRegistry metrics = JStormMetrics.getWorkerMetrics(); + + public static void updateNimbusHistogram(String name, Number obj) { + updateHistogram(NIMBUS_METRIC_KEY, name, obj); } - - protected static SimpleJStormMetric instance = null; - - - public static SimpleJStormMetric mkInstance() { - synchronized (SimpleJStormMetric.class) { - if (instance == null) { - instance = new SimpleJStormMetric(); - } - - return instance; - } + + public static void updateSupervisorHistogram(String name, Number obj) { + updateHistogram(SUPERVISOR_METRIC_KEY, name, obj); } - - protected SimpleJStormMetric() { - + + public static void updateNimbusMeter(String name, Number obj) { + updateMeter(NIMBUS_METRIC_KEY, name, obj); + } + + public static void updateSupervisorMeter(String name, Number obj) { + updateMeter(SUPERVISOR_METRIC_KEY, name, obj); + } + + public static void updateNimbusCounter(String name, Number obj) { + updateCounter(NIMBUS_METRIC_KEY, name, obj); } - - public static Histogram registerHistorgram(String name) { - return JStormMetrics.registerWorkerHistogram(name); + + public static void updateSupervisorCounter(String name, Number obj) { + updateCounter(SUPERVISOR_METRIC_KEY, name, obj); } - - public static void updateHistorgram(String name, Number obj) { - LOG.debug(name + ":" + obj.doubleValue()); - Histogram histogram = (Histogram)metrics.getMetric(name); + + public static void updateHistogram(String key, String name, Number obj) { + String formalName = MetricUtils.workerMetricName(key, host, 0, name, MetricType.HISTOGRAM); + AsmHistogram histogram = (AsmHistogram) metrics.getMetric(formalName); if (histogram == null) { - try { - histogram = registerHistorgram(name); - }catch(Exception e) { - LOG.info("{} has been register", name); - return; - } + histogram = registerHistogram(name); } - + histogram.update(obj); - } - @Override - public void run() { - // TODO Auto-generated method stub - Map<String, Metric> map = metrics.getMetrics(); - - for (Entry<String, Metric> entry : map.entrySet()) { - String key = entry.getKey(); - Metric metric = entry.getValue(); - - LOG.info(key + ":" + metric.getSnapshot()); + public static void updateMeter(String key, String name, Number obj) { + String formalName = MetricUtils.workerMetricName(key, host, 0, name, MetricType.METER); + AsmMeter meter = (AsmMeter) metrics.getMetric(formalName); + if (meter == null) { + meter = registerMeter(name); } + + meter.update(obj); + } + + public static void updateCounter(String key, String name, Number obj) { + String formalName = MetricUtils.workerMetricName(key, host, 0, name, MetricType.COUNTER); + AsmCounter counter = (AsmCounter) metrics.getMetric(formalName); + if (counter == null) { + counter = registerCounter(name); + } + + counter.update(obj); + } + + private static AsmGauge registerGauge(Gauge<Double> gauge, String name) { + AsmGauge gauge1 = new AsmGauge(gauge); + gauge1.setOp(AsmMetric.MetricOp.LOG); + + return registerWorkerGauge(topologyId, name, gauge1); } - - - public static void main(String[] args) { - updateHistorgram("test", 11100.0); - - SimpleJStormMetric instance = new SimpleJStormMetric(); - - instance.run(); + + private static AsmHistogram registerHistogram(String name) { + AsmHistogram histogram = new AsmHistogram(); + histogram.setOp(AsmMetric.MetricOp.LOG); + + return registerWorkerHistogram(NIMBUS_METRIC_KEY, name, histogram); + } + + public static AsmMeter registerMeter(String name) { + AsmMeter meter = new AsmMeter(); + meter.setOp(AsmMetric.MetricOp.LOG); + + return registerWorkerMeter(NIMBUS_METRIC_KEY, name, meter); + } + + public static AsmCounter registerCounter(String name) { + AsmCounter counter = new AsmCounter(); + counter.setOp(AsmMetric.MetricOp.LOG); + + return registerWorkerCounter(NIMBUS_METRIC_KEY, name, counter); } }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TimeTicker.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TimeTicker.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TimeTicker.java new file mode 100644 index 0000000..e44e7b5 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TimeTicker.java @@ -0,0 +1,52 @@ +package com.alibaba.jstorm.metric; + +import java.util.concurrent.TimeUnit; + +/** + * a simple util class to calculate run time + * + * @author Cody ([email protected]) + * @since 2.0.5 + */ +public class TimeTicker { + private TimeUnit unit; + private long start; + private long end; + + public TimeTicker(TimeUnit unit) { + if (unit != TimeUnit.NANOSECONDS && unit != TimeUnit.MILLISECONDS) { + throw new IllegalArgumentException("invalid unit!"); + } + this.unit = unit; + } + + public TimeTicker(TimeUnit unit, boolean start) { + this(unit); + if (start) { + start(); + } + } + + public void start() { + if (unit == TimeUnit.MILLISECONDS) { + this.start = System.currentTimeMillis(); + } else if (unit == TimeUnit.NANOSECONDS) { + this.start = System.nanoTime(); + } + } + + public long stop() { + if (unit == TimeUnit.MILLISECONDS) { + this.end = System.currentTimeMillis(); + } else if (unit == TimeUnit.NANOSECONDS) { + this.end = System.nanoTime(); + } + return end - start; + } + + public long stopAndRestart() { + long elapsed = stop(); + start(); + return elapsed; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TopologyMetricContext.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TopologyMetricContext.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TopologyMetricContext.java new file mode 100644 index 0000000..8dae281 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TopologyMetricContext.java @@ -0,0 +1,528 @@ +package com.alibaba.jstorm.metric; + +import backtype.storm.generated.MetricInfo; +import backtype.storm.generated.MetricSnapshot; +import backtype.storm.generated.TopologyMetric; +import com.alibaba.jstorm.client.ConfigExtension; +import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; +import org.apache.commons.lang.ArrayUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A topology metric context contains all in-memory metric data of a topology. + * This class resides in TopologyMaster. + * + * @author Cody ([email protected]) + * @since 2.0.5 + */ +public class TopologyMetricContext { + private final Logger LOG = LoggerFactory.getLogger(getClass()); + + private final ReentrantLock lock = new ReentrantLock(); + private Set<ResourceWorkerSlot> workerSet; + private int taskNum = 1; + private ConcurrentMap<String, MetricInfo> memCache = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, Long> memMeta = new ConcurrentHashMap<>(); + private final AtomicBoolean isMerging = new AtomicBoolean(false); + private String topologyId; + private volatile int flushedMetaNum = 0; + + /** + * sync meta from metric cache on startup + */ + private volatile boolean syncMeta = false; + + private Map conf; + + public TopologyMetricContext() { + } + + public TopologyMetricContext(Set<ResourceWorkerSlot> workerSet) { + this.workerSet = workerSet; + } + + public TopologyMetricContext(String topologyId, Set<ResourceWorkerSlot> workerSet, Map conf) { + this(workerSet); + this.topologyId = topologyId; + this.conf = conf; + } + + public ConcurrentMap<String, Long> getMemMeta() { + return memMeta; + } + + public String getTopologyId() { + return topologyId; + } + + public void setTopologyId(String topologyId) { + this.topologyId = topologyId; + } + + public boolean syncMeta() { + return syncMeta; + } + + public void setSyncMeta(boolean syncMeta) { + this.syncMeta = syncMeta; + } + + public int getTaskNum() { + return taskNum; + } + + public void setTaskNum(int taskNum) { + this.taskNum = taskNum; + } + + public int getFlushedMetaNum() { + return flushedMetaNum; + } + + public void setFlushedMetaNum(int flushedMetaNum) { + this.flushedMetaNum = flushedMetaNum; + } + + public ReentrantLock getLock() { + return lock; + } + + public int getWorkerNum() { + return workerSet.size(); + } + + public void setWorkerSet(Set<ResourceWorkerSlot> workerSet) { + this.workerSet = workerSet; + } + + public void resetUploadedMetrics() { + this.memCache.clear(); + } + + public final ConcurrentMap<String, MetricInfo> getMemCache() { + return memCache; + } + + public void addToMemCache(String workerSlot, MetricInfo metricInfo) { + memCache.put(workerSlot, metricInfo); + LOG.info("update mem cache, worker:{}, total uploaded:{}", workerSlot, memCache.size()); + } + + public boolean readyToUpload() { + return memCache.size() >= workerSet.size(); + } + + public boolean isMerging() { + return isMerging.get(); + } + + public void setMerging(boolean isMerging) { + this.isMerging.set(isMerging); + } + + public int getUploadedWorkerNum() { + return memCache.size(); + } + + public TopologyMetric mergeMetrics() { + long start = System.currentTimeMillis(); + + if (getMemCache().size() == 0) { + //LOG.info("topology:{}, metric size is 0, skip...", topologyId); + return null; + } + if (isMerging()) { + LOG.info("topology {} is already merging, skip...", topologyId); + return null; + } + + setMerging(true); + + try { + Map<String, MetricInfo> workerMetricMap = this.memCache; + // reset mem cache + this.memCache = new ConcurrentHashMap<>(); + + MetricInfo topologyMetrics = MetricUtils.mkMetricInfo(); + MetricInfo componentMetrics = MetricUtils.mkMetricInfo(); + MetricInfo taskMetrics = MetricUtils.mkMetricInfo(); + MetricInfo streamMetrics = MetricUtils.mkMetricInfo(); + MetricInfo workerMetrics = MetricUtils.mkMetricInfo(); + MetricInfo nettyMetrics = MetricUtils.mkMetricInfo(); + TopologyMetric tpMetric = + new TopologyMetric(topologyMetrics, componentMetrics, workerMetrics, taskMetrics, streamMetrics, nettyMetrics); + + + // metric name => worker count + Map<String, Integer> metricNameCounters = new HashMap<>(); + + // special for histograms & timers, we merge the points to get a new snapshot data. + Map<String, Map<Integer, Histogram>> histograms = new HashMap<>(); + Map<String, Map<Integer, Timer>> timers = new HashMap<>(); + + // iterate metrics of all workers within the same topology + for (ConcurrentMap.Entry<String, MetricInfo> metricEntry : workerMetricMap.entrySet()) { + MetricInfo metricInfo = metricEntry.getValue(); + + // merge counters: add old and new values, note we only add incoming new metrics and overwrite + // existing data, same for all below. + Map<String, Map<Integer, MetricSnapshot>> metrics = metricInfo.get_metrics(); + for (Map.Entry<String, Map<Integer, MetricSnapshot>> metric : metrics.entrySet()) { + String metricName = metric.getKey(); + Map<Integer, MetricSnapshot> data = metric.getValue(); + MetaType metaType = MetricUtils.metaType(metricName); + + MetricType metricType = MetricUtils.metricType(metricName); + if (metricType == MetricType.COUNTER) { + mergeCounters(tpMetric, metaType, metricName, data); + } else if (metricType == MetricType.GAUGE) { + mergeGauges(tpMetric, metaType, metricName, data); + } else if (metricType == MetricType.METER) { + mergeMeters(getMetricInfoByType(tpMetric, metaType), metricName, data, metricNameCounters); + } else if (metricType == MetricType.HISTOGRAM) { + mergeHistograms(getMetricInfoByType(tpMetric, metaType), + metricName, data, metricNameCounters, histograms); + } else if (metricType == MetricType.TIMER) { + mergeTimers(getMetricInfoByType(tpMetric, metaType), + metricName, data, metricNameCounters, timers); + } + } + } + adjustHistogramTimerMetrics(tpMetric, metricNameCounters, histograms, timers); + // for counters, we only report delta data every time, need to sum with old data + //adjustCounterMetrics(tpMetric, oldTpMetric); + + LOG.info("merge topology metrics:{}, cost:{}", topologyId, System.currentTimeMillis() - start); + // debug logs + //MetricUtils.printMetricWinSize(componentMetrics); + + return tpMetric; + } finally { + setMerging(false); + } + } + + + protected MetricInfo getMetricInfoByType(TopologyMetric topologyMetric, MetaType type) { + if (type == MetaType.TASK) { + return topologyMetric.get_taskMetric(); + } else if (type == MetaType.WORKER) { + return topologyMetric.get_workerMetric(); + } else if (type == MetaType.COMPONENT) { + return topologyMetric.get_componentMetric(); + } else if (type == MetaType.STREAM) { + return topologyMetric.get_streamMetric(); + } else if (type == MetaType.NETTY) { + return topologyMetric.get_nettyMetric(); + } else if (type == MetaType.TOPOLOGY) { + return topologyMetric.get_topologyMetric(); + } + return null; + } + + public void mergeCounters(TopologyMetric tpMetric, MetaType metaType, String meta, + Map<Integer, MetricSnapshot> data) { + MetricInfo metricInfo = getMetricInfoByType(tpMetric, metaType); + Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta); + if (existing == null) { + metricInfo.put_to_metrics(meta, data); + } else { + for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) { + Integer win = dataEntry.getKey(); + MetricSnapshot snapshot = dataEntry.getValue(); + MetricSnapshot old = existing.get(win); + if (old == null) { + existing.put(win, snapshot); + } else { + old.set_ts(snapshot.get_ts()); + old.set_longValue(old.get_longValue() + snapshot.get_longValue()); + } + } + } + } + + public void mergeGauges(TopologyMetric tpMetric, MetaType metaType, String meta, + Map<Integer, MetricSnapshot> data) { + MetricInfo metricInfo = getMetricInfoByType(tpMetric, metaType); + Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta); + if (existing == null) { + metricInfo.put_to_metrics(meta, data); + } else { + for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) { + Integer win = dataEntry.getKey(); + MetricSnapshot snapshot = dataEntry.getValue(); + MetricSnapshot old = existing.get(win); + if (old == null) { + existing.put(win, snapshot); + } else { + if (snapshot.get_ts() >= old.get_ts()) { + old.set_ts(snapshot.get_ts()); + if (metaType != MetaType.TOPOLOGY) { + old.set_doubleValue(snapshot.get_doubleValue()); + } else { // for topology metric, gauge might be add-able, e.g., cpu, memory, etc. + old.set_doubleValue(old.get_doubleValue() + snapshot.get_doubleValue()); + } + } + } + } + } + } + + /** + * meters are not sampled. + */ + public void mergeMeters(MetricInfo metricInfo, String meta, Map<Integer, MetricSnapshot> data, + Map<String, Integer> metaCounters) { + Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta); + if (existing == null) { + metricInfo.put_to_metrics(meta, data); + } else { + for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) { + Integer win = dataEntry.getKey(); + MetricSnapshot snapshot = dataEntry.getValue(); + MetricSnapshot old = existing.get(win); + if (old == null) { + existing.put(win, snapshot); + } else { + if (snapshot.get_ts() >= old.get_ts()) { + old.set_ts(snapshot.get_ts()); + old.set_mean(old.get_mean() + snapshot.get_mean()); + old.set_m1(old.get_m1() + snapshot.get_m1()); + old.set_m5(old.get_m5() + snapshot.get_m5()); + old.set_m15(old.get_m15() + snapshot.get_m15()); + } + } + } + } + updateMetricCounters(meta, metaCounters); + } + + /** + * histograms are sampled, but we just update points + */ + public void mergeHistograms(MetricInfo metricInfo, String meta, Map<Integer, MetricSnapshot> data, + Map<String, Integer> metaCounters, Map<String, Map<Integer, Histogram>> histograms) { + Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta); + if (existing == null) { + metricInfo.put_to_metrics(meta, data); + Map<Integer, Histogram> histogramMap = new HashMap<>(); + for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) { + Histogram histogram = MetricUtils.metricSnapshot2Histogram(dataEntry.getValue()); + histogramMap.put(dataEntry.getKey(), histogram); + } + histograms.put(meta, histogramMap); + } else { + for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) { + Integer win = dataEntry.getKey(); + MetricSnapshot snapshot = dataEntry.getValue(); + MetricSnapshot old = existing.get(win); + if (old == null) { + existing.put(win, snapshot); + histograms.get(meta).put(win, MetricUtils.metricSnapshot2Histogram(snapshot)); + } else { + if (snapshot.get_ts() >= old.get_ts()) { + old.set_ts(snapshot.get_ts()); + // update points + MetricUtils.updateHistogramPoints(histograms.get(meta).get(win), snapshot.get_points()); + } + } + } + } + updateMetricCounters(meta, metaCounters); + } + + /** + * timers are sampled, we just update points + */ + public void mergeTimers(MetricInfo metricInfo, String meta, Map<Integer, MetricSnapshot> data, + Map<String, Integer> metaCounters, Map<String, Map<Integer, Timer>> timers) { + Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta); + if (existing == null) { + metricInfo.put_to_metrics(meta, data); + Map<Integer, Timer> timerMap = new HashMap<>(); + for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) { + Timer timer = MetricUtils.metricSnapshot2Timer(dataEntry.getValue()); + timerMap.put(dataEntry.getKey(), timer); + } + timers.put(meta, timerMap); + } else { + for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) { + Integer win = dataEntry.getKey(); + MetricSnapshot snapshot = dataEntry.getValue(); + MetricSnapshot old = existing.get(win); + if (old == null) { + existing.put(win, snapshot); + timers.get(meta).put(win, MetricUtils.metricSnapshot2Timer(snapshot)); + } else { + if (snapshot.get_ts() >= old.get_ts()) { + old.set_ts(snapshot.get_ts()); + old.set_m1(old.get_m1() + snapshot.get_m1()); + old.set_m5(old.get_m5() + snapshot.get_m5()); + old.set_m15(old.get_m15() + snapshot.get_m15()); + + // update points + MetricUtils.updateTimerPoints(timers.get(meta).get(win), snapshot.get_points()); + } + } + } + } + updateMetricCounters(meta, metaCounters); + } + + /** + * computes occurrences of specified metric name + */ + protected void updateMetricCounters(String metricName, Map<String, Integer> metricNameCounters) { + if (metricNameCounters.containsKey(metricName)) { + metricNameCounters.put(metricName, metricNameCounters.get(metricName) + 1); + } else { + metricNameCounters.put(metricName, 1); + } + } + + protected void adjustHistogramTimerMetrics(TopologyMetric tpMetric, Map<String, Integer> metaCounters, + Map<String, Map<Integer, Histogram>> histograms, + Map<String, Map<Integer, Timer>> timers) { + resetPoints(tpMetric.get_taskMetric().get_metrics()); + resetPoints(tpMetric.get_streamMetric().get_metrics()); + resetPoints(tpMetric.get_nettyMetric().get_metrics()); + resetPoints(tpMetric.get_workerMetric().get_metrics()); + + Map<String, Map<Integer, MetricSnapshot>> compMetrics = + tpMetric.get_componentMetric().get_metrics(); + Map<String, Map<Integer, MetricSnapshot>> topologyMetrics = + tpMetric.get_topologyMetric().get_metrics(); + + adjustMetrics(compMetrics, metaCounters, histograms, timers); + adjustMetrics(topologyMetrics, metaCounters, histograms, timers); + } + + private void adjustMetrics(Map<String, Map<Integer, MetricSnapshot>> metrics, Map<String, Integer> metaCounters, + Map<String, Map<Integer, Histogram>> histograms, Map<String, Map<Integer, Timer>> timers) { + for (Map.Entry<String, Map<Integer, MetricSnapshot>> metricEntry : metrics.entrySet()) { + String meta = metricEntry.getKey(); + MetricType metricType = MetricUtils.metricType(meta); + MetaType metaType = MetricUtils.metaType(meta); + Map<Integer, MetricSnapshot> winData = metricEntry.getValue(); + + if (metricType == MetricType.HISTOGRAM) { + for (Map.Entry<Integer, MetricSnapshot> dataEntry : winData.entrySet()) { + MetricSnapshot snapshot = dataEntry.getValue(); + Integer cnt = metaCounters.get(meta); + Histogram histogram = histograms.get(meta).get(dataEntry.getKey()); + if (cnt != null && cnt > 1) { + + Snapshot snapshot1 = histogram.getSnapshot(); + snapshot.set_mean(snapshot1.getMean()); + snapshot.set_p50(snapshot1.getMedian()); + snapshot.set_p75(snapshot1.get75thPercentile()); + snapshot.set_p95(snapshot1.get95thPercentile()); + snapshot.set_p98(snapshot1.get98thPercentile()); + snapshot.set_p99(snapshot1.get99thPercentile()); + snapshot.set_p999(snapshot1.get999thPercentile()); + snapshot.set_stddev(snapshot1.getStdDev()); + snapshot.set_min(snapshot1.getMin()); + snapshot.set_max(snapshot1.getMax()); + + if (metaType == MetaType.TOPOLOGY) { + snapshot.set_points(Arrays.asList(ArrayUtils.toObject(snapshot1.getValues()))); + } + } + if (metaType != MetaType.TOPOLOGY) { + snapshot.set_points(new ArrayList<Long>(0)); + } + } + + } else if (metricType == MetricType.TIMER) { + for (Map.Entry<Integer, MetricSnapshot> dataEntry : winData.entrySet()) { + MetricSnapshot snapshot = dataEntry.getValue(); + Integer cnt = metaCounters.get(meta); + if (cnt != null && cnt > 1) { + Timer timer = timers.get(meta).get(dataEntry.getKey()); + Snapshot snapshot1 = timer.getSnapshot(); + snapshot.set_p50(snapshot1.getMedian()); + snapshot.set_p75(snapshot1.get75thPercentile()); + snapshot.set_p95(snapshot1.get95thPercentile()); + snapshot.set_p98(snapshot1.get98thPercentile()); + snapshot.set_p99(snapshot1.get99thPercentile()); + snapshot.set_p999(snapshot1.get999thPercentile()); + snapshot.set_stddev(snapshot1.getStdDev()); + snapshot.set_min(snapshot1.getMin()); + snapshot.set_max(snapshot1.getMax()); + } + snapshot.set_points(new ArrayList<Long>(0)); + } + } + } + } + + private void resetPoints(Map<String, Map<Integer, MetricSnapshot>> metrics) { + for (Map.Entry<String, Map<Integer, MetricSnapshot>> metricEntry : metrics.entrySet()) { + String meta = metricEntry.getKey(); + MetricType metricType = MetricUtils.metricType(meta); + Map<Integer, MetricSnapshot> winData = metricEntry.getValue(); + + if (metricType == MetricType.HISTOGRAM || metricType == MetricType.TIMER) { + for (MetricSnapshot snapshot : winData.values()) { + snapshot.set_points(new ArrayList<Long>(0)); + } + } + } + } + + protected void adjustCounterMetrics(TopologyMetric tpMetric, TopologyMetric oldMetric) { + if (oldMetric != null) { + mergeCounters(tpMetric.get_streamMetric().get_metrics(), + oldMetric.get_streamMetric().get_metrics()); + + mergeCounters(tpMetric.get_taskMetric().get_metrics(), + oldMetric.get_taskMetric().get_metrics()); + + mergeCounters(tpMetric.get_componentMetric().get_metrics(), + oldMetric.get_componentMetric().get_metrics()); + + mergeCounters(tpMetric.get_workerMetric().get_metrics(), + oldMetric.get_workerMetric().get_metrics()); + + mergeCounters(tpMetric.get_nettyMetric().get_metrics(), + oldMetric.get_nettyMetric().get_metrics()); + } + } + + /** + * sum old counter snapshots and new counter snapshots, sums are stored in new snapshots. + */ + private void mergeCounters(Map<String, Map<Integer, MetricSnapshot>> newCounters, + Map<String, Map<Integer, MetricSnapshot>> oldCounters) { + for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : newCounters.entrySet()) { + String metricName = entry.getKey(); + Map<Integer, MetricSnapshot> snapshots = entry.getValue(); + Map<Integer, MetricSnapshot> oldSnapshots = oldCounters.get(metricName); + if (oldSnapshots != null && oldSnapshots.size() > 0) { + for (Map.Entry<Integer, MetricSnapshot> snapshotEntry : snapshots.entrySet()) { + Integer win = snapshotEntry.getKey(); + MetricSnapshot snapshot = snapshotEntry.getValue(); + MetricSnapshot oldSnapshot = oldSnapshots.get(win); + if (oldSnapshot != null) { + snapshot.set_longValue(snapshot.get_longValue() + oldSnapshot.get_longValue()); + } + } + } + } + } + + private double getSampleRate() { + return ConfigExtension.getMetricSampleRate(conf); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEvent.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEvent.java b/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEvent.java index eabcd44..71e7cac 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEvent.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEvent.java @@ -31,11 +31,10 @@ public class JstormEvent { this.msgId = msgId; } - public final static EventFactory<JstormEvent> EVENT_FACTORY = - new EventFactory<JstormEvent>() { - public JstormEvent newInstance() { - return new JstormEvent(); - } - }; + public final static EventFactory<JstormEvent> EVENT_FACTORY = new EventFactory<JstormEvent>() { + public JstormEvent newInstance() { + return new JstormEvent(); + } + }; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEventHandler.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEventHandler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEventHandler.java index dfc43d1..107441d 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEventHandler.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEventHandler.java @@ -33,8 +33,7 @@ public class JstormEventHandler implements EventHandler { } @Override - public void onEvent(Object event, long sequence, boolean endOfBatch) - throws Exception { + public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception { long msgId = Long.parseLong(((JstormEvent) event).getMsgId()); // if (msgId % size ==0) { // logger.warn("consumer msgId=" + msgId + ", seq=" + sequence); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/Assignment.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/Assignment.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/Assignment.java index 69620bd..8a05662 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/Assignment.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/Assignment.java @@ -29,31 +29,28 @@ import org.apache.commons.lang.builder.ToStringStyle; import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot; /** - * Assignment of one Toplogy, stored in /ZK-DIR/assignments/{topologyid} - * nodeHost {supervisorid: hostname} -- assigned supervisor Map - * taskStartTimeSecs: {taskid, taskStartSeconds} masterCodeDir: topology source - * code's dir in Nimbus taskToResource: {taskid, ResourceAssignment} + * Assignment of one Toplogy, stored in /ZK-DIR/assignments/{topologyid} nodeHost {supervisorid: hostname} -- assigned supervisor Map taskStartTimeSecs: + * {taskid, taskStartSeconds} masterCodeDir: topology source code's dir in Nimbus taskToResource: {taskid, ResourceAssignment} * * @author Lixin/Longda */ public class Assignment implements Serializable { public enum AssignmentType { - Assign, Config + Assign, UpdateTopology, ScaleTopology } private static final long serialVersionUID = 6087667851333314069L; private final String masterCodeDir; /** - * @@@ nodeHost store <supervisorId, hostname>, this will waste some zk - * storage + * @@@ nodeHost store <supervisorId, hostname>, this will waste some zk storage */ private final Map<String, String> nodeHost; private final Map<Integer, Integer> taskStartTimeSecs; private final Set<ResourceWorkerSlot> workers; private long timeStamp; - + private AssignmentType type; public Assignment() { @@ -64,10 +61,8 @@ public class Assignment implements Serializable { this.timeStamp = System.currentTimeMillis(); this.type = AssignmentType.Assign; } - - public Assignment(String masterCodeDir, Set<ResourceWorkerSlot> workers, - Map<String, String> nodeHost, - Map<Integer, Integer> taskStartTimeSecs) { + + public Assignment(String masterCodeDir, Set<ResourceWorkerSlot> workers, Map<String, String> nodeHost, Map<Integer, Integer> taskStartTimeSecs) { this.workers = workers; this.nodeHost = nodeHost; this.taskStartTimeSecs = taskStartTimeSecs; @@ -79,11 +74,11 @@ public class Assignment implements Serializable { public void setAssignmentType(AssignmentType type) { this.type = type; } - + public AssignmentType getAssignmentType() { return type; } - + public Map<String, String> getNodeHost() { return nodeHost; } @@ -106,11 +101,9 @@ public class Assignment implements Serializable { * @param supervisorId * @return Map<Integer, WorkerSlot> */ - public Map<Integer, ResourceWorkerSlot> getTaskToNodePortbyNode( - String supervisorId) { + public Map<Integer, ResourceWorkerSlot> getTaskToNodePortbyNode(String supervisorId) { - Map<Integer, ResourceWorkerSlot> result = - new HashMap<Integer, ResourceWorkerSlot>(); + Map<Integer, ResourceWorkerSlot> result = new HashMap<Integer, ResourceWorkerSlot>(); for (ResourceWorkerSlot worker : workers) { if (worker.getNodeId().equals(supervisorId)) { result.put(worker.getPort(), worker); @@ -144,8 +137,7 @@ public class Assignment implements Serializable { public Set<Integer> getCurrentWorkerTasks(String supervisorId, int port) { for (ResourceWorkerSlot worker : workers) { - if (worker.getNodeId().equals(supervisorId) - && worker.getPort() == port) + if (worker.getNodeId().equals(supervisorId) && worker.getPort() == port) return worker.getTasks(); } @@ -164,26 +156,24 @@ public class Assignment implements Serializable { return this.timeStamp; } + public boolean isTopologyChange(long oldTimeStamp) { + boolean isChange = false; + if (timeStamp > oldTimeStamp && (type.equals(AssignmentType.UpdateTopology) || type.equals(AssignmentType.ScaleTopology))) + isChange = true; + return isChange; + } + public void updateTimeStamp() { timeStamp = System.currentTimeMillis(); } - + @Override public int hashCode() { final int prime = 31; int result = 1; - result = - prime - * result - + ((masterCodeDir == null) ? 0 : masterCodeDir - .hashCode()); - result = - prime * result + ((nodeHost == null) ? 0 : nodeHost.hashCode()); - result = - prime - * result - + ((taskStartTimeSecs == null) ? 0 : taskStartTimeSecs - .hashCode()); + result = prime * result + ((masterCodeDir == null) ? 0 : masterCodeDir.hashCode()); + result = prime * result + ((nodeHost == null) ? 0 : nodeHost.hashCode()); + result = prime * result + ((taskStartTimeSecs == null) ? 0 : taskStartTimeSecs.hashCode()); result = prime * result + ((workers == null) ? 0 : workers.hashCode()); result = prime * result + (int) (timeStamp & 0xFFFFFFFF); return result; @@ -225,8 +215,7 @@ public class Assignment implements Serializable { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/AssignmentBak.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/AssignmentBak.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/AssignmentBak.java index 3e7a770..4b9bbb3 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/AssignmentBak.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/AssignmentBak.java @@ -32,8 +32,7 @@ public class AssignmentBak implements Serializable { private final Map<String, List<Integer>> componentTasks; private final Assignment assignment; - public AssignmentBak(Map<String, List<Integer>> componentTasks, - Assignment assignment) { + public AssignmentBak(Map<String, List<Integer>> componentTasks, Assignment assignment) { super(); this.componentTasks = componentTasks; this.assignment = assignment; @@ -49,7 +48,6 @@ public class AssignmentBak implements Serializable { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/CleanRunnable.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/CleanRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/CleanRunnable.java index d73adfd..77bb883 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/CleanRunnable.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/CleanRunnable.java @@ -62,8 +62,7 @@ public class CleanRunnable implements Runnable { try { f.delete(); } catch (Exception e) { - log.error("Cleaning inbox ... error deleting:" - + f.getName() + "," + e); + log.error("Cleaning inbox ... error deleting:" + f.getName() + "," + e); } } else { clean(f); @@ -72,8 +71,7 @@ public class CleanRunnable implements Runnable { try { f.delete(); } catch (Exception e) { - log.error("Cleaning inbox ... error deleting:" - + f.getName() + "," + e); + log.error("Cleaning inbox ... error deleting:" + f.getName() + "," + e); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/DelayEventRunnable.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/DelayEventRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/DelayEventRunnable.java index 14d38d8..a9683fd 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/DelayEventRunnable.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/DelayEventRunnable.java @@ -28,8 +28,7 @@ public class DelayEventRunnable implements Runnable { private StatusType status; private Object[] args; - public DelayEventRunnable(NimbusData data, String topologyid, - StatusType status, Object[] args) { + public DelayEventRunnable(NimbusData data, String topologyid, StatusType status, Object[] args) { this.data = data; this.topologyid = topologyid; this.status = status; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/FollowerRunnable.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/FollowerRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/FollowerRunnable.java index e62c61b..d87b65e 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/FollowerRunnable.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/FollowerRunnable.java @@ -18,21 +18,21 @@ package com.alibaba.jstorm.schedule; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.commons.io.FileExistsException; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.Config; -import backtype.storm.utils.Utils; - import com.alibaba.jstorm.callback.RunnableCallback; import com.alibaba.jstorm.client.ConfigExtension; import com.alibaba.jstorm.cluster.Cluster; @@ -44,10 +44,12 @@ import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.NetWorkUtils; import com.alibaba.jstorm.utils.PathUtils; +import backtype.storm.Config; +import backtype.storm.utils.Utils; + public class FollowerRunnable implements Runnable { - private static final Logger LOG = LoggerFactory - .getLogger(FollowerRunnable.class); + private static final Logger LOG = LoggerFactory.getLogger(FollowerRunnable.class); private NimbusData data; @@ -59,50 +61,62 @@ public class FollowerRunnable implements Runnable { private final String hostPort; + public static final String NIMBUS_DIFFER_COUNT_ZK = "nimbus.differ.count.zk"; + + public static final Integer SLAVE_NIMBUS_WAIT_TIME = 120; + @SuppressWarnings("unchecked") public FollowerRunnable(final NimbusData data, int sleepTime) { this.data = data; this.sleepTime = sleepTime; + if (!ConfigExtension.isNimbusUseIp(data.getConf())) { - this.hostPort = - NetWorkUtils.hostname() - + ":" - + String.valueOf(Utils.getInt(data.getConf().get( - Config.NIMBUS_THRIFT_PORT))); + this.hostPort = NetWorkUtils.hostname() + ":" + String.valueOf(Utils.getInt(data.getConf().get(Config.NIMBUS_THRIFT_PORT))); } else { - this.hostPort = - NetWorkUtils.ip() - + ":" - + String.valueOf(Utils.getInt(data.getConf().get( - Config.NIMBUS_THRIFT_PORT))); + this.hostPort = NetWorkUtils.ip() + ":" + String.valueOf(Utils.getInt(data.getConf().get(Config.NIMBUS_THRIFT_PORT))); } try { + String[] hostfigs = this.hostPort.split(":"); boolean isLocaliP = false; - if(hostfigs.length > 0){ + if (hostfigs.length > 0) { isLocaliP = hostfigs[0].equals("127.0.0.1"); } - if(isLocaliP){ + if (isLocaliP) { throw new Exception("the hostname which Nimbus get is localhost"); } - }catch(Exception e1){ - LOG.error("get nimbus host error!", e1); - throw new RuntimeException(e1); - } - try { - this.tryToBeLeader(data.getConf()); } catch (Exception e1) { - // TODO Auto-generated catch block - LOG.error("try to be leader error.", e1); + LOG.error("get nimbus host error!", e1); throw new RuntimeException(e1); } + try { - data.getStormClusterState().update_nimbus_slave(hostPort, - data.uptime()); + data.getStormClusterState().update_nimbus_slave(hostPort, data.uptime()); + data.getStormClusterState().update_nimbus_detail(hostPort, null); } catch (Exception e) { LOG.error("register nimbus host fail!", e); throw new RuntimeException(); } + try{ + update_nimbus_detail(); + }catch (Exception e){ + LOG.error("register detail of nimbus fail!", e); + throw new RuntimeException(); + } + try { + this.tryToBeLeader(data.getConf()); + } catch (Exception e1) { + try { + data.getStormClusterState().unregister_nimbus_host(hostPort); + data.getStormClusterState().unregister_nimbus_detail(hostPort); + }catch (Exception e2){ + LOG.info("due to task errors, so remove register nimbus infomation" ); + }finally { + // TODO Auto-generated catch block + LOG.error("try to be leader error.", e1); + throw new RuntimeException(e1); + } + } callback = new RunnableCallback() { @Override public void run() { @@ -121,6 +135,8 @@ public class FollowerRunnable implements Runnable { return true; } + // Two nimbus running on the same node isn't allowed + // so just checks ip is enough here String[] part = zkMaster.split(":"); return NetWorkUtils.equals(part[0], NetWorkUtils.ip()); } @@ -143,20 +159,21 @@ public class FollowerRunnable implements Runnable { if (data.isLeader() == true) { if (isZkLeader == false) { LOG.info("New ZK master is " + master); - JStormUtils.halt_process(1, - "Lose ZK master node, halt process"); + JStormUtils.halt_process(1, "Lose ZK master node, halt process"); return; } } if (isZkLeader == true) { zkClusterState.unregister_nimbus_host(hostPort); + zkClusterState.unregister_nimbus_detail(hostPort); data.setLeader(true); continue; } check(); zkClusterState.update_nimbus_slave(hostPort, data.uptime()); + update_nimbus_detail(); } catch (InterruptedException e) { // TODO Auto-generated catch block continue; @@ -178,14 +195,29 @@ public class FollowerRunnable implements Runnable { StormClusterState clusterState = data.getStormClusterState(); try { - String master_stormdist_root = - StormConfig.masterStormdistRoot(data.getConf()); + String master_stormdist_root = StormConfig.masterStormdistRoot(data.getConf()); - List<String> code_ids = - PathUtils.read_dir_contents(master_stormdist_root); + List<String> code_ids = PathUtils.read_dir_contents(master_stormdist_root); List<String> assignments_ids = clusterState.assignments(callback); + Map<String, Assignment> assignmentMap = new HashMap<String, Assignment>(); + List<String> update_ids = new ArrayList<String>(); + for (String id : assignments_ids) { + Assignment assignment = clusterState.assignment_info(id, null); + Long localCodeDownTS; + try { + Long tmp = StormConfig.read_nimbus_topology_timestamp(data.getConf(), id); + localCodeDownTS = (tmp == null ? 0L : tmp); + } catch (FileNotFoundException e) { + localCodeDownTS = 0L; + } + if (assignment != null && assignment.isTopologyChange(localCodeDownTS.longValue())) { + update_ids.add(id); + } + assignmentMap.put(id, assignment); + } + List<String> done_ids = new ArrayList<String>(); for (String id : code_ids) { @@ -199,13 +231,15 @@ public class FollowerRunnable implements Runnable { code_ids.remove(id); } + //redownload topologyid which hava been updated; + assignments_ids.addAll(update_ids); + for (String topologyId : code_ids) { deleteLocalTopology(topologyId); } for (String id : assignments_ids) { - Assignment assignment = clusterState.assignment_info(id, null); - downloadCodeFromMaster(assignment, id); + downloadCodeFromMaster(assignmentMap.get(id), id); } } catch (IOException e) { // TODO Auto-generated catch block @@ -219,8 +253,7 @@ public class FollowerRunnable implements Runnable { } private void deleteLocalTopology(String topologyId) throws IOException { - String dir_to_delete = - StormConfig.masterStormdistRoot(data.getConf(), topologyId); + String dir_to_delete = StormConfig.masterStormdistRoot(data.getConf(), topologyId); try { PathUtils.rmr(dir_to_delete); LOG.info("delete:" + dir_to_delete + "successfully!"); @@ -230,47 +263,113 @@ public class FollowerRunnable implements Runnable { } } - private void downloadCodeFromMaster(Assignment assignment, String topologyId) - throws IOException, TException { + private void downloadCodeFromMaster(Assignment assignment, String topologyId) throws IOException, TException { try { - String localRoot = - StormConfig.masterStormdistRoot(data.getConf(), topologyId); - String tmpDir = - StormConfig.masterInbox(data.getConf()) + "/" - + UUID.randomUUID().toString(); + String localRoot = StormConfig.masterStormdistRoot(data.getConf(), topologyId); + String tmpDir = StormConfig.masterInbox(data.getConf()) + "/" + UUID.randomUUID().toString(); String masterCodeDir = assignment.getMasterCodeDir(); - JStormServerUtils.downloadCodeFromMaster(data.getConf(), tmpDir, - masterCodeDir, topologyId, false); + JStormServerUtils.downloadCodeFromMaster(data.getConf(), tmpDir, masterCodeDir, topologyId, false); - FileUtils.moveDirectory(new File(tmpDir), new File(localRoot)); + File srcDir = new File(tmpDir); + File destDir = new File(localRoot); + try { + FileUtils.moveDirectory(srcDir, destDir); + } catch (FileExistsException e) { + FileUtils.copyDirectory(srcDir, destDir); + FileUtils.deleteQuietly(srcDir); + } + // Update downloadCode timeStamp + StormConfig.write_nimbus_topology_timestamp(data.getConf(), topologyId, System.currentTimeMillis()); } catch (TException e) { // TODO Auto-generated catch block - LOG.error(e + " downloadStormCode failed " + "topologyId:" - + topologyId + "masterCodeDir:" - + assignment.getMasterCodeDir()); + LOG.error(e + " downloadStormCode failed " + "topologyId:" + topologyId + "masterCodeDir:" + assignment.getMasterCodeDir()); throw e; } - LOG.info("Finished downloading code for topology id " + topologyId - + " from " + assignment.getMasterCodeDir()); + LOG.info("Finished downloading code for topology id " + topologyId + " from " + assignment.getMasterCodeDir()); } private void tryToBeLeader(final Map conf) throws Exception { - RunnableCallback masterCallback = new RunnableCallback() { - @Override - public void run() { - try { - tryToBeLeader(conf); - } catch (Exception e) { - LOG.error("To be master error", e); - JStormUtils.halt_process(30, - "Cant't to be master" + e.getMessage()); + boolean allowed = check_nimbus_priority(); + + if (allowed){ + RunnableCallback masterCallback = new RunnableCallback() { + @Override + public void run() { + try { + tryToBeLeader(conf); + } catch (Exception e) { + LOG.error("To be master error", e); + JStormUtils.halt_process(30, "Cant't to be master" + e.getMessage()); + } } - } - }; - data.getStormClusterState().try_to_be_leader(Cluster.MASTER_SUBTREE, - hostPort, masterCallback); + }; + LOG.info("This nimbus can be leader"); + data.getStormClusterState().try_to_be_leader(Cluster.MASTER_SUBTREE, hostPort, masterCallback); + }else { + LOG.info("This nimbus can't be leader"); + } } + /** + * Compared with other nimbus ,get priority of this nimbus + * + * @throws Exception + */ + private boolean check_nimbus_priority() throws Exception { + + int gap = update_nimbus_detail(); + if (gap == 0) { + return true; + } + + int left = SLAVE_NIMBUS_WAIT_TIME; + while(left > 0) { + LOG.info( "After " + left + " seconds, nimbus will try to be Leader!"); + Thread.sleep(10 * 1000); + left -= 10; + } + + StormClusterState zkClusterState = data.getStormClusterState(); + + List<String> followers = zkClusterState.list_dirs(Cluster.NIMBUS_SLAVE_DETAIL_SUBTREE, false); + if (followers == null || followers.size() == 0) { + return false; + } + for (String follower : followers) { + if (follower != null && !follower.equals(hostPort)) { + Map bMap = zkClusterState.get_nimbus_detail(follower, false); + if (bMap != null){ + Object object = bMap.get(NIMBUS_DIFFER_COUNT_ZK); + if (object != null && (JStormUtils.parseInt(object)) < gap){ + LOG.info("Current node can't be leader, due to {} has higher priority", follower); + return false; + } + } + } + } + + + + return true; + } + private int update_nimbus_detail() throws Exception { + + //update count = count of zk's binary files - count of nimbus's binary files + StormClusterState zkClusterState = data.getStormClusterState(); + String master_stormdist_root = StormConfig.masterStormdistRoot(data.getConf()); + List<String> code_ids = PathUtils.read_dir_contents(master_stormdist_root); + List<String> assignments_ids = data.getStormClusterState().assignments(callback); + assignments_ids.removeAll(code_ids); + + Map mtmp = zkClusterState.get_nimbus_detail(hostPort, false); + if (mtmp == null){ + mtmp = new HashMap(); + } + mtmp.put(NIMBUS_DIFFER_COUNT_ZK, assignments_ids.size()); + zkClusterState.update_nimbus_detail(hostPort, mtmp); + LOG.debug("update nimbus's detail " + mtmp); + return assignments_ids.size(); + } /** * Check whether current node is master or not * @@ -291,13 +390,11 @@ public class FollowerRunnable implements Runnable { // current process own master return; } - LOG.warn("Current Nimbus has start thrift, but fail to own zk master :" - + zkHost); + LOG.warn("Current Nimbus has start thrift, but fail to own zk master :" + zkHost); } // current process doesn't own master - String err = - "Current Nimubs fail to own nimbus_master, should halt process"; + String err = "Current Nimubs fail to own nimbus_master, should halt process"; LOG.error(err); JStormUtils.halt_process(0, err); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/IToplogyScheduler.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/IToplogyScheduler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/IToplogyScheduler.java index a9d9b92..a6e6093 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/IToplogyScheduler.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/IToplogyScheduler.java @@ -26,6 +26,5 @@ import com.alibaba.jstorm.utils.FailedAssignTopologyException; public interface IToplogyScheduler { void prepare(Map conf); - Set<ResourceWorkerSlot> assignTasks(TopologyAssignContext contex) - throws FailedAssignTopologyException; + Set<ResourceWorkerSlot> assignTasks(TopologyAssignContext contex) throws FailedAssignTopologyException; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/MonitorRunnable.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/MonitorRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/MonitorRunnable.java index 8342b79..b7c8e89 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/MonitorRunnable.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/MonitorRunnable.java @@ -17,27 +17,28 @@ */ package com.alibaba.jstorm.schedule; -import java.util.Date; -import java.util.List; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import backtype.storm.Config; +import backtype.storm.generated.TaskHeartbeat; +import backtype.storm.generated.TopologyTaskHbInfo; import com.alibaba.jstorm.cluster.StormClusterState; import com.alibaba.jstorm.daemon.nimbus.NimbusData; import com.alibaba.jstorm.daemon.nimbus.NimbusUtils; import com.alibaba.jstorm.daemon.nimbus.StatusType; import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot; +import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.TimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable.TaskDeadEvent; + +import java.util.*; /** - * - * Scan all task's heartbeat, if task isn't alive, DO - * NimbusUtils.transition(monitor) + * Scan all task's heartbeat, if task isn't alive, DO NimbusUtils.transition(monitor) * * @author Longda - * */ public class MonitorRunnable implements Runnable { private static Logger LOG = LoggerFactory.getLogger(MonitorRunnable.class); @@ -49,9 +50,7 @@ public class MonitorRunnable implements Runnable { } /** - * @@@ Todo when one topology is being reassigned, the topology should be - * skip check - * @param data + * @@@ Todo when one topology is being reassigned, the topology should skip check */ @Override public void run() { @@ -80,44 +79,84 @@ public class MonitorRunnable implements Runnable { LOG.info("Failed to get task ids of " + topologyid); continue; } + Assignment assignment = clusterState.assignment_info(topologyid, null); + Set<Integer> deadTasks = new HashSet<Integer>(); boolean needReassign = false; for (Integer task : taskIds) { - boolean isTaskDead = - NimbusUtils.isTaskDead(data, topologyid, task); + boolean isTaskDead = NimbusUtils.isTaskDead(data, topologyid, task); if (isTaskDead == true) { - LOG.info("Found " + topologyid + ",taskid:" + task - + " is dead"); - - ResourceWorkerSlot resource = null; - Assignment assignment = - clusterState.assignment_info(topologyid, null); - if (assignment != null) - resource = assignment.getWorkerByTaskId(task); - if (resource != null) { - Date now = new Date(); - String nowStr = TimeFormat.getSecond(now); - String errorInfo = - "Task-" + task + " is dead on " - + resource.getHostname() + ":" - + resource.getPort() + ", " - + nowStr; - LOG.info(errorInfo); - clusterState.report_task_error(topologyid, task, - errorInfo); - } + deadTasks.add(task); needReassign = true; - break; } } + + + TopologyTaskHbInfo topologyHbInfo = data.getTasksHeartbeat().get(topologyid); if (needReassign == true) { - NimbusUtils.transition(data, topologyid, false, - StatusType.monitor); + if (topologyHbInfo != null) { + int topologyMasterId = topologyHbInfo.get_topologyMasterId(); + if (deadTasks.contains(topologyMasterId)) { + deadTasks.clear(); + if (assignment != null) { + ResourceWorkerSlot resource = assignment.getWorkerByTaskId(topologyMasterId); + if (resource != null) + deadTasks.addAll(resource.getTasks()); + else + deadTasks.add(topologyMasterId); + } + } else { + Map<Integer, TaskHeartbeat> taskHbs = topologyHbInfo.get_taskHbs(); + int launchTime = JStormUtils.parseInt(data.getConf().get(Config.NIMBUS_TASK_LAUNCH_SECS)); + if (taskHbs == null || taskHbs.get(topologyMasterId) == null || taskHbs.get(topologyMasterId).get_uptime() < launchTime) { + /*try { + clusterState.topology_heartbeat(topologyid, topologyHbInfo); + } catch (Exception e) { + LOG.error("Failed to update task heartbeat info to ZK for " + topologyid, e); + }*/ + return; + } + } + Map<Integer, ResourceWorkerSlot> deadTaskWorkers = new HashMap<>(); + for (Integer task : deadTasks) { + LOG.info("Found " + topologyid + ",taskid:" + task + " is dead"); + + ResourceWorkerSlot resource = null; + if (assignment != null) + resource = assignment.getWorkerByTaskId(task); + if (resource != null) { + deadTaskWorkers.put(task, resource); + Date now = new Date(); + String nowStr = TimeFormat.getSecond(now); + String errorInfo = "Task-" + task + " is dead on " + resource.getHostname() + ":" + resource.getPort() + ", " + nowStr; + LOG.info(errorInfo); + clusterState.report_task_error(topologyid, task, errorInfo, null); + } + } + + if (deadTaskWorkers.size() > 0) { + // notify jstorm monitor + TaskDeadEvent event = new TaskDeadEvent(); + event.clusterName = data.getClusterName(); + event.topologyId = topologyid; + event.deadTasks = deadTaskWorkers; + event.timestamp = System.currentTimeMillis(); + data.getMetricRunnable().pushEvent(event); + } + } + NimbusUtils.transition(data, topologyid, false, StatusType.monitor); + } + + if (topologyHbInfo != null) { + try { + clusterState.topology_heartbeat(topologyid, topologyHbInfo); + } catch (Exception e) { + LOG.error("Failed to update task heartbeat info to ZK for " + topologyid, e); + } } } } catch (Exception e) { - // TODO Auto-generated catch block LOG.error(e.getMessage(), e); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/TopologyAssignContext.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/TopologyAssignContext.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/TopologyAssignContext.java index 12cdad0..9a3a879 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/TopologyAssignContext.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/TopologyAssignContext.java @@ -38,6 +38,8 @@ public class TopologyAssignContext { public static final int ASSIGN_TYPE_MONITOR = 2; // monitor a topology, some // tasks are dead + protected String topologyId; + protected int assignType; protected StormTopology rawTopology; @@ -51,6 +53,9 @@ public class TopologyAssignContext { protected Map<String, SupervisorInfo> cluster; + protected int topoMasterTaskId; + protected boolean assignSingleWorkerForTM = false; + protected Map<Integer, String> taskToComponent; protected Set<Integer> allTaskIds; // all tasks @@ -76,6 +81,17 @@ public class TopologyAssignContext { this.deadTaskIds = copy.getDeadTaskIds(); this.unstoppedTaskIds = copy.getUnstoppedTaskIds(); this.isReassign = copy.isReassign(); + this.topologyId = copy.getTopologyId(); + this.topoMasterTaskId = copy.getTopologyMasterTaskId(); + this.assignSingleWorkerForTM = copy.getAssignSingleWorkerForTM(); + } + + public String getTopologyId() { + return topologyId; + } + + public void setTopologyId(String topologyId) { + this.topologyId = topologyId; } public int getAssignType() { @@ -151,8 +167,7 @@ public class TopologyAssignContext { } public static boolean isAssignTypeValid(int type) { - return (type == ASSIGN_TYPE_NEW) || (type == ASSIGN_TYPE_REBALANCE) - || (type == ASSIGN_TYPE_MONITOR); + return (type == ASSIGN_TYPE_NEW) || (type == ASSIGN_TYPE_REBALANCE) || (type == ASSIGN_TYPE_MONITOR); } public Set<ResourceWorkerSlot> getUnstoppedWorkers() { @@ -171,9 +186,24 @@ public class TopologyAssignContext { this.isReassign = isReassign; } + public int getTopologyMasterTaskId() { + return topoMasterTaskId; + } + + public void setTopologyMasterTaskId(int taskId) { + this.topoMasterTaskId = taskId; + } + + public boolean getAssignSingleWorkerForTM() { + return assignSingleWorkerForTM; + } + + public void setAssignSingleWorkerForTM(boolean isAssign) { + this.assignSingleWorkerForTM = isAssign; + } + @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyAssignContext.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyAssignContext.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyAssignContext.java index 9eb2775..78649aa 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyAssignContext.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyAssignContext.java @@ -17,30 +17,20 @@ */ package com.alibaba.jstorm.schedule.default_assign; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.commons.lang.builder.ToStringBuilder; -import org.apache.commons.lang.builder.ToStringStyle; - import backtype.storm.Config; -import backtype.storm.generated.Bolt; -import backtype.storm.generated.ComponentCommon; -import backtype.storm.generated.SpoutSpec; -import backtype.storm.generated.StateSpoutSpec; -import backtype.storm.generated.StormTopology; +import backtype.storm.generated.*; import backtype.storm.utils.ThriftTopologyUtils; - +import com.alibaba.jstorm.client.ConfigExtension; import com.alibaba.jstorm.cluster.Common; import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo; import com.alibaba.jstorm.schedule.TopologyAssignContext; import com.alibaba.jstorm.utils.FailedAssignTopologyException; import com.alibaba.jstorm.utils.JStormUtils; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; + +import java.util.*; +import java.util.Map.Entry; public class DefaultTopologyAssignContext extends TopologyAssignContext { @@ -49,19 +39,16 @@ public class DefaultTopologyAssignContext extends TopologyAssignContext { private final Map<String, List<String>> hostToSid; private final Set<ResourceWorkerSlot> oldWorkers; private final Map<String, List<Integer>> componentTasks; - private final Set<ResourceWorkerSlot> unstoppedWorkers = - new HashSet<ResourceWorkerSlot>(); + private final Set<ResourceWorkerSlot> unstoppedWorkers = new HashSet<ResourceWorkerSlot>(); private final int totalWorkerNum; private final int unstoppedWorkerNum; private int computeWorkerNum() { - Integer settingNum = - JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_WORKERS)); + Integer settingNum = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_WORKERS)); - int hintSum = 0; + int ret = 0, hintSum = 0, tmCount = 0; - Map<String, Object> components = - ThriftTopologyUtils.getComponents(sysTopology); + Map<String, Object> components = ThriftTopologyUtils.getComponents(sysTopology); for (Entry<String, Object> entry : components.entrySet()) { String componentName = entry.getKey(); Object component = entry.getValue(); @@ -78,14 +65,35 @@ public class DefaultTopologyAssignContext extends TopologyAssignContext { } int hint = common.get_parallelism_hint(); + if (componentName.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) { + tmCount += hint; + continue; + } hintSum += hint; } if (settingNum == null) { - return hintSum; + ret = hintSum; } else { - return Math.min(settingNum, hintSum); + ret = Math.min(settingNum, hintSum); } + + Boolean isTmSingleWorker = ConfigExtension.getTopologyMasterSingleWorker(stormConf); + if (isTmSingleWorker != null) { + if (isTmSingleWorker == true) { + // Assign a single worker for topology master + ret += tmCount; + setAssignSingleWorkerForTM(true); + } + } else { + // If not configured, judge this config by worker number + if (ret >= 10) { + ret += tmCount; + setAssignSingleWorkerForTM(true); + } + } + + return ret; } public int computeUnstoppedAssignments() { @@ -149,8 +157,7 @@ public class DefaultTopologyAssignContext extends TopologyAssignContext { try { sysTopology = Common.system_topology(stormConf, rawTopology); } catch (Exception e) { - throw new FailedAssignTopologyException( - "Failed to generate system topology"); + throw new FailedAssignTopologyException("Failed to generate system topology"); } sidToHostname = generateSidToHost(); @@ -215,7 +222,6 @@ public class DefaultTopologyAssignContext extends TopologyAssignContext { } public String toDetailString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyScheduler.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyScheduler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyScheduler.java index 5df7de4..99ba9da 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyScheduler.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyScheduler.java @@ -32,8 +32,7 @@ import com.alibaba.jstorm.schedule.TopologyAssignContext; import com.alibaba.jstorm.utils.FailedAssignTopologyException; public class DefaultTopologyScheduler implements IToplogyScheduler { - private static final Logger LOG = LoggerFactory - .getLogger(DefaultTopologyScheduler.class); + private static final Logger LOG = LoggerFactory.getLogger(DefaultTopologyScheduler.class); private Map nimbusConf; @@ -57,8 +56,7 @@ public class DefaultTopologyScheduler implements IToplogyScheduler { for (Integer task : canFree) { ResourceWorkerSlot worker = oldAssigns.getWorkerByTaskId(task); if (worker == null) { - LOG.warn("When free rebalance resource, no ResourceAssignment of task " - + task); + LOG.warn("When free rebalance resource, no ResourceAssignment of task " + task); continue; } @@ -79,25 +77,22 @@ public class DefaultTopologyScheduler implements IToplogyScheduler { } else if (assignType == TopologyAssignContext.ASSIGN_TYPE_REBALANCE) { needAssign.addAll(context.getAllTaskIds()); needAssign.removeAll(context.getUnstoppedTaskIds()); - } else { - // monitor - needAssign.addAll(context.getDeadTaskIds()); + } else { // ASSIGN_TYPE_MONITOR + Set<Integer> deadTasks = context.getDeadTaskIds(); + needAssign.addAll(deadTasks); } return needAssign; } /** - * Get the task Map which the task is alive and will be kept Only when type - * is ASSIGN_TYPE_MONITOR, it is valid + * Get the task Map which the task is alive and will be kept Only when type is ASSIGN_TYPE_MONITOR, it is valid * * @param defaultContext * @param needAssigns * @return */ - public Set<ResourceWorkerSlot> getKeepAssign( - DefaultTopologyAssignContext defaultContext, - Set<Integer> needAssigns) { + public Set<ResourceWorkerSlot> getKeepAssign(DefaultTopologyAssignContext defaultContext, Set<Integer> needAssigns) { Set<Integer> keepAssignIds = new HashSet<Integer>(); keepAssignIds.addAll(defaultContext.getAllTaskIds()); @@ -125,21 +120,17 @@ public class DefaultTopologyScheduler implements IToplogyScheduler { } @Override - public Set<ResourceWorkerSlot> assignTasks(TopologyAssignContext context) - throws FailedAssignTopologyException { + public Set<ResourceWorkerSlot> assignTasks(TopologyAssignContext context) throws FailedAssignTopologyException { int assignType = context.getAssignType(); if (TopologyAssignContext.isAssignTypeValid(assignType) == false) { - throw new FailedAssignTopologyException("Invalide Assign Type " - + assignType); + throw new FailedAssignTopologyException("Invalide Assign Type " + assignType); } - DefaultTopologyAssignContext defaultContext = - new DefaultTopologyAssignContext(context); + DefaultTopologyAssignContext defaultContext = new DefaultTopologyAssignContext(context); if (assignType == TopologyAssignContext.ASSIGN_TYPE_REBALANCE) { /** - * Mark all current assigned worker as available. Current assignment - * will be restored in task scheduler. + * Mark all current assigned worker as available. Current assignment will be restored in task scheduler. */ freeUsed(defaultContext); } @@ -148,36 +139,24 @@ public class DefaultTopologyScheduler implements IToplogyScheduler { Set<Integer> needAssignTasks = getNeedAssignTasks(defaultContext); - Set<ResourceWorkerSlot> keepAssigns = - getKeepAssign(defaultContext, needAssignTasks); + Set<ResourceWorkerSlot> keepAssigns = getKeepAssign(defaultContext, needAssignTasks); // please use tree map to make task sequence Set<ResourceWorkerSlot> ret = new HashSet<ResourceWorkerSlot>(); ret.addAll(keepAssigns); ret.addAll(defaultContext.getUnstoppedWorkers()); - int allocWorkerNum = - defaultContext.getTotalWorkerNum() - - defaultContext.getUnstoppedWorkerNum() - - keepAssigns.size(); - LOG.info("allocWorkerNum=" + allocWorkerNum + ", totalWorkerNum=" - + defaultContext.getTotalWorkerNum()); + int allocWorkerNum = defaultContext.getTotalWorkerNum() - defaultContext.getUnstoppedWorkerNum() - keepAssigns.size(); + LOG.info("allocWorkerNum=" + allocWorkerNum + ", totalWorkerNum=" + defaultContext.getTotalWorkerNum() + ", keepWorkerNum=" + keepAssigns.size()); if (allocWorkerNum <= 0) { - LOG.warn("Don't need assign workers, all workers are fine " - + defaultContext.toDetailString()); - throw new FailedAssignTopologyException( - "Don't need assign worker, all workers are fine "); + LOG.warn("Don't need assign workers, all workers are fine " + defaultContext.toDetailString()); + throw new FailedAssignTopologyException("Don't need assign worker, all workers are fine "); } - List<ResourceWorkerSlot> availableWorkers = - WorkerScheduler.getInstance().getAvailableWorkers( - defaultContext, needAssignTasks, allocWorkerNum); - TaskScheduler taskScheduler = - new TaskScheduler(defaultContext, needAssignTasks, - availableWorkers); - Set<ResourceWorkerSlot> assignment = - new HashSet<ResourceWorkerSlot>(taskScheduler.assign()); + List<ResourceWorkerSlot> availableWorkers = WorkerScheduler.getInstance().getAvailableWorkers(defaultContext, needAssignTasks, allocWorkerNum); + TaskScheduler taskScheduler = new TaskScheduler(defaultContext, needAssignTasks, availableWorkers); + Set<ResourceWorkerSlot> assignment = new HashSet<ResourceWorkerSlot>(taskScheduler.assign()); ret.addAll(assignment); LOG.info("Keep Alive slots:" + keepAssigns); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/ResourceWorkerSlot.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/ResourceWorkerSlot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/ResourceWorkerSlot.java index c218f52..df8eba5 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/ResourceWorkerSlot.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/ResourceWorkerSlot.java @@ -39,8 +39,7 @@ import com.alibaba.jstorm.utils.NetWorkUtils; //one worker 's assignment public class ResourceWorkerSlot extends WorkerSlot implements Serializable { - public static Logger LOG = LoggerFactory - .getLogger(ResourceWorkerSlot.class); + public static Logger LOG = LoggerFactory.getLogger(ResourceWorkerSlot.class); private static final long serialVersionUID = 9138386287559932411L; private String hostname; @@ -58,16 +57,14 @@ public class ResourceWorkerSlot extends WorkerSlot implements Serializable { super(supervisorId, port); } - public ResourceWorkerSlot(WorkerAssignment worker, - Map<String, List<Integer>> componentToTask) { + public ResourceWorkerSlot(WorkerAssignment worker, Map<String, List<Integer>> componentToTask) { super(worker.getNodeId(), worker.getPort()); this.hostname = worker.getHostName(); this.tasks = new HashSet<Integer>(); this.cpu = worker.getCpu(); this.memSize = worker.getMem(); this.jvm = worker.getJvm(); - for (Entry<String, Integer> entry : worker.getComponentToNum() - .entrySet()) { + for (Entry<String, Integer> entry : worker.getComponentToNum().entrySet()) { List<Integer> tasks = componentToTask.get(entry.getKey()); if (tasks == null || tasks.size() == 0) continue; @@ -121,12 +118,10 @@ public class ResourceWorkerSlot extends WorkerSlot implements Serializable { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } - public boolean compareToUserDefineWorker(WorkerAssignment worker, - Map<Integer, String> taskToComponent) { + public boolean compareToUserDefineWorker(WorkerAssignment worker, Map<Integer, String> taskToComponent) { int cpu = worker.getCpu(); if (cpu != 0 && this.cpu != cpu) return false; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/AbstractSelector.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/AbstractSelector.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/AbstractSelector.java index eed0e39..1130b1a 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/AbstractSelector.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/AbstractSelector.java @@ -36,8 +36,7 @@ public abstract class AbstractSelector implements Selector { this.context = context; } - protected List<ResourceWorkerSlot> selectWorker( - List<ResourceWorkerSlot> list, Comparator<ResourceWorkerSlot> c) { + protected List<ResourceWorkerSlot> selectWorker(List<ResourceWorkerSlot> list, Comparator<ResourceWorkerSlot> c) { List<ResourceWorkerSlot> result = new ArrayList<ResourceWorkerSlot>(); ResourceWorkerSlot best = null; for (ResourceWorkerSlot worker : list) { @@ -58,8 +57,7 @@ public abstract class AbstractSelector implements Selector { } @Override - public List<ResourceWorkerSlot> select(List<ResourceWorkerSlot> result, - String name) { + public List<ResourceWorkerSlot> select(List<ResourceWorkerSlot> result, String name) { if (result.size() == 1) return result; result = this.selectWorker(result, workerComparator.get(name));
