http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/Merger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/Merger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/Merger.java new file mode 100644 index 0000000..4e31019 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/Merger.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.operator.merger; + +import java.io.Serializable; +import java.util.Collection; + +public interface Merger<V> extends Serializable { + V merge(Collection<V> objs, V unflushed, Object... others); +}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/SumMerger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/SumMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/SumMerger.java new file mode 100644 index 0000000..b4d65e6 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/SumMerger.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.operator.merger; + +import java.util.Collection; + +import com.alibaba.jstorm.utils.JStormUtils; + +public class SumMerger<T extends Number> implements Merger<T> { + private static final long serialVersionUID = -7026523452570138433L; + + @SuppressWarnings("unchecked") + @Override + public T merge(Collection<T> objs, T unflushed, Object... others) { + // TODO Auto-generated method stub + T ret = unflushed; + for (T obj : objs) { + ret = (T) JStormUtils.add(ret, obj); + } + + return ret; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/TpsMerger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/TpsMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/TpsMerger.java new file mode 100644 index 0000000..4d770d6 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/TpsMerger.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.operator.merger; + +import java.util.Collection; + +import com.alibaba.jstorm.common.metric.old.operator.StartTime; + +public class TpsMerger implements Merger<Double> { + private static final long serialVersionUID = -4534840881635955942L; + protected final long createTime; + + public TpsMerger() { + createTime = System.currentTimeMillis(); + } + + public long getRunMillis(Object... args) { + long startTime = createTime; + + if (args != null) { + if (args[0] != null && args[0] instanceof StartTime) { + StartTime rollingWindow = (StartTime) args[0]; + + startTime = rollingWindow.getStartTime(); + } + } + + return (System.currentTimeMillis() - startTime); + } + + @Override + public Double merge(Collection<Double> objs, Double unflushed, Object... others) { + // TODO Auto-generated method stub + double sum = 0.0d; + if (unflushed != null) { + sum += unflushed; + } + + for (Double item : objs) { + if (item != null) { + sum += item; + } + } + + Double ret = (sum * 1000) / getRunMillis(others); + return ret; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AddUpdater.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AddUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AddUpdater.java new file mode 100644 index 0000000..a6c06f6 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AddUpdater.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.operator.updater; + +import com.alibaba.jstorm.utils.JStormUtils; + +public class AddUpdater<T extends Number> implements Updater<T> { + private static final long serialVersionUID = -7955740095421752763L; + + @SuppressWarnings("unchecked") + @Override + public T update(Number object, T cache, Object... others) { + // TODO Auto-generated method stub + return (T) JStormUtils.add(cache, object); + } + + @Override + public T updateBatch(T object, T cache, Object... objects) { + // TODO Auto-generated method stub + return (T) JStormUtils.add(cache, object); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AvgUpdater.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AvgUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AvgUpdater.java new file mode 100644 index 0000000..a0b7aa1 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AvgUpdater.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.operator.updater; + +import com.alibaba.jstorm.common.metric.old.Histogram; + +public class AvgUpdater implements Updater<Histogram.HistorgramPair> { + private static final long serialVersionUID = 2562836921724586449L; + + @Override + public Histogram.HistorgramPair update(Number object, Histogram.HistorgramPair cache, Object... others) { + // TODO Auto-generated method stub + if (object == null) { + return cache; + } + if (cache == null) { + cache = new Histogram.HistorgramPair(); + } + + cache.addValue(object.doubleValue()); + cache.addTimes(1l); + + return cache; + } + + @Override + public Histogram.HistorgramPair updateBatch(Histogram.HistorgramPair object, Histogram.HistorgramPair cache, Object... objects) { + // TODO Auto-generated method stub + if (object == null) { + return cache; + } + if (cache == null) { + cache = new Histogram.HistorgramPair(); + } + + cache.addValue(object.getSum()); + cache.addTimes(object.getTimes()); + + return cache; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/DoubleAddUpdater.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/DoubleAddUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/DoubleAddUpdater.java new file mode 100644 index 0000000..b25f97b --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/DoubleAddUpdater.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.operator.updater; + +import com.google.common.util.concurrent.AtomicDouble; + +public class DoubleAddUpdater implements Updater<AtomicDouble> { + private static final long serialVersionUID = -1293565961076552462L; + + @Override + public AtomicDouble update(Number object, AtomicDouble cache, Object... others) { + // TODO Auto-generated method stub + if (cache == null) { + cache = new AtomicDouble(0.0); + } + if (object != null) { + cache.addAndGet(object.doubleValue()); + } + return cache; + } + + @Override + public AtomicDouble updateBatch(AtomicDouble object, AtomicDouble cache, Object... objects) { + // TODO Auto-generated method stub + return update(object, cache, objects); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/LongAddUpdater.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/LongAddUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/LongAddUpdater.java new file mode 100644 index 0000000..8db6f21 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/LongAddUpdater.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.operator.updater; + +import java.util.concurrent.atomic.AtomicLong; + +public class LongAddUpdater implements Updater<AtomicLong> { + private static final long serialVersionUID = -2185639264737912405L; + + @Override + public AtomicLong update(Number object, AtomicLong cache, Object... others) { + // TODO Auto-generated method stub + if (cache == null) { + cache = new AtomicLong(0); + } + + if (object != null) { + cache.addAndGet(object.longValue()); + } + return cache; + } + + @Override + public AtomicLong updateBatch(AtomicLong object, AtomicLong cache, Object... objects) { + // TODO Auto-generated method stub + return update(object, cache, objects); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/Updater.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/Updater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/Updater.java new file mode 100644 index 0000000..42d7b58 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/Updater.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.operator.updater; + +import java.io.Serializable; + +public interface Updater<V> extends Serializable { + V update(Number object, V cache, Object... others); + + V updateBatch(V object, V cache, Object... objects); +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/AllWindow.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/AllWindow.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/AllWindow.java new file mode 100644 index 0000000..244db74 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/AllWindow.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.window; + +import com.alibaba.jstorm.common.metric.old.operator.Sampling; +import com.alibaba.jstorm.common.metric.old.operator.StartTime; +import com.alibaba.jstorm.common.metric.old.operator.merger.Merger; +import com.alibaba.jstorm.common.metric.old.operator.updater.Updater; + +import java.util.ArrayList; + +public class AllWindow<V> implements Sampling<V>, StartTime { + + private static final long serialVersionUID = -8523514907315740812L; + + protected V unflushed; + protected V defaultValue; + + protected Updater<V> updater; + protected Merger<V> merger; + protected long startTime; + + AllWindow(V defaultValue, Updater<V> updater, Merger<V> merger) { + + this.updater = updater; + this.merger = merger; + + this.defaultValue = defaultValue; + this.startTime = System.currentTimeMillis(); + } + + @Override + public void update(Number obj) { + // TODO Auto-generated method stub + synchronized (this) { + unflushed = updater.update(obj, unflushed); + } + } + + public void updateBatch(V batch) { + synchronized (this) { + unflushed = updater.updateBatch(batch, unflushed); + } + } + + @Override + public V getSnapshot() { + // TODO Auto-generated method stub + V ret = merger.merge(new ArrayList<V>(), unflushed, this); + if (ret == null) { + return defaultValue; + } else { + return ret; + } + } + + @Override + public long getStartTime() { + // TODO Auto-generated method stub + return startTime; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/Metric.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/Metric.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/Metric.java new file mode 100644 index 0000000..e505a1f --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/Metric.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.window; + +import com.alibaba.jstorm.callback.Callback; +import com.alibaba.jstorm.common.metric.old.operator.Sampling; +import com.alibaba.jstorm.common.metric.old.operator.convert.Convertor; +import com.alibaba.jstorm.common.metric.old.operator.merger.Merger; +import com.alibaba.jstorm.common.metric.old.operator.updater.Updater; +import com.alibaba.jstorm.utils.IntervalCheck; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +public class Metric<T, V> implements Sampling<Map<Integer, T>> { + private static final long serialVersionUID = -1362345159511508074L; + private static final Logger LOG = LoggerFactory.getLogger(Metric.class); + + protected static boolean enable; + + public static void setEnable(boolean e) { + enable = e; + } + + protected List<RollingWindow<V>> rollingWindows; + protected AllWindow<V> allWindow; + + protected int[] windowSeconds = { StatBuckets.MINUTE_WINDOW, StatBuckets.HOUR_WINDOW, StatBuckets.DAY_WINDOW }; + protected int bucketSize = StatBuckets.NUM_STAT_BUCKETS; + protected V defaultValue; + protected Updater<V> updater; + protected Merger<V> merger; + protected Convertor<V, T> convertor; + protected Callback callback; + + protected int interval; // unit is second + protected IntervalCheck intervalCheck; + protected V unflushed; + + public Metric() { + } + + public int getInterval() { + if (windowSeconds == null || windowSeconds.length == 0) { + return StatBuckets.NUM_STAT_BUCKETS; + } + + int intervals[] = new int[windowSeconds.length]; + int smallest = Integer.MAX_VALUE; + for (int i = 0; i < windowSeconds.length; i++) { + int interval = windowSeconds[i] / bucketSize; + intervals[i] = interval; + if (interval < smallest) { + smallest = interval; + } + } + + for (int goodInterval = smallest; goodInterval > 1; goodInterval--) { + boolean good = true; + for (int interval : intervals) { + if (interval % goodInterval != 0) { + good = false; + break; + } + } + + if (good == true) { + return goodInterval; + } + } + + return 1; + } + + public void init() { + if (defaultValue == null || updater == null || merger == null || convertor == null) { + throw new IllegalArgumentException("Invalid argements"); + } + + rollingWindows = new ArrayList<RollingWindow<V>>(); + if (windowSeconds != null) { + rollingWindows.clear(); + for (int windowSize : windowSeconds) { + RollingWindow<V> rollingWindow = new RollingWindow<V>(defaultValue, windowSize / bucketSize, windowSize, updater, merger); + + rollingWindows.add(rollingWindow); + } + + } + allWindow = new AllWindow<V>(defaultValue, updater, merger); + + this.interval = getInterval(); + this.intervalCheck = new IntervalCheck(); + this.intervalCheck.setInterval(interval); + } + + /** + * In order to improve performance Do + */ + @Override + public void update(Number obj) { + if (enable == false) { + return; + } + + if (intervalCheck.check()) { + flush(); + } + synchronized (this) { + unflushed = updater.update(obj, unflushed); + } + } + + public synchronized void flush() { + if (unflushed == null) { + return; + } + for (RollingWindow<V> rollingWindow : rollingWindows) { + rollingWindow.updateBatch(unflushed); + } + allWindow.updateBatch(unflushed); + unflushed = null; + } + + @Override + public Map<Integer, T> getSnapshot() { + // TODO Auto-generated method stub + flush(); + + Map<Integer, T> ret = new TreeMap<Integer, T>(); + for (RollingWindow<V> rollingWindow : rollingWindows) { + V value = rollingWindow.getSnapshot(); + + ret.put(rollingWindow.getWindowSecond(), convertor.convert(value)); + } + + ret.put(StatBuckets.ALL_TIME_WINDOW, convertor.convert(allWindow.getSnapshot())); + + if (callback != null) { + callback.execute(this); + } + return ret; + } + + public T getAllTimeValue() { + return convertor.convert(allWindow.getSnapshot()); + } + + public int[] getWindowSeconds() { + return windowSeconds; + } + + public void setWindowSeconds(int[] windowSeconds) { + this.windowSeconds = windowSeconds; + } + + public int getBucketSize() { + return bucketSize; + } + + public void setBucketSize(int bucketSize) { + this.bucketSize = bucketSize; + } + + public V getDefaultValue() { + return defaultValue; + } + + public void setDefaultValue(V defaultValue) { + this.defaultValue = defaultValue; + } + + public Updater<V> getUpdater() { + return updater; + } + + public void setUpdater(Updater<V> updater) { + this.updater = updater; + } + + public Merger<V> getMerger() { + return merger; + } + + public void setMerger(Merger<V> merger) { + this.merger = merger; + } + + public Convertor<V, T> getConvertor() { + return convertor; + } + + public void setConvertor(Convertor<V, T> convertor) { + this.convertor = convertor; + } + + public Callback getCallback() { + return callback; + } + + public void setCallback(Callback callback) { + this.callback = callback; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/RollingWindow.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/RollingWindow.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/RollingWindow.java new file mode 100644 index 0000000..5963951 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/RollingWindow.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.window; + +import com.alibaba.jstorm.common.metric.old.operator.Sampling; +import com.alibaba.jstorm.common.metric.old.operator.StartTime; +import com.alibaba.jstorm.common.metric.old.operator.merger.Merger; +import com.alibaba.jstorm.common.metric.old.operator.updater.Updater; +import com.alibaba.jstorm.utils.IntervalCheck; +import com.alibaba.jstorm.utils.TimeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.TreeMap; + +public class RollingWindow<V> implements Sampling<V>, StartTime { + private static final long serialVersionUID = 3794478417380003279L; + private static final Logger LOG = LoggerFactory.getLogger(RollingWindow.class); + + protected long startTime; + protected Integer currBucketTime; + protected int interval; // unit is second + protected int windowSecond; + protected IntervalCheck intervalCheck; + + protected TreeMap<Integer, V> buckets; + protected Integer bucketNum; + protected V unflushed; + protected V defaultValue; + + protected Updater<V> updater; + protected Merger<V> merger; + + RollingWindow(V defaultValue, int interval, int windowSecond, Updater<V> updater, Merger<V> merger) { + this.startTime = System.currentTimeMillis(); + this.interval = interval; + this.intervalCheck = new IntervalCheck(); + this.intervalCheck.setInterval(interval); + this.currBucketTime = getCurrBucketTime(); + + this.bucketNum = windowSecond / interval; + this.windowSecond = (bucketNum) * interval; + + this.buckets = new TreeMap<Integer, V>(); + + this.updater = updater; + this.merger = merger; + + this.defaultValue = defaultValue; + + } + + @Override + public void update(Number obj) { + // TODO Auto-generated method stub + + if (intervalCheck.check()) { + rolling(); + } + synchronized (this) { + unflushed = updater.update(obj, unflushed); + + } + + } + + /** + * In order to improve performance Flush one batch to rollingWindow + * + */ + public void updateBatch(V batch) { + + if (intervalCheck.check()) { + rolling(); + } + synchronized (this) { + unflushed = updater.updateBatch(batch, unflushed); + } + + } + + @Override + public V getSnapshot() { + // TODO Auto-generated method stub + if (intervalCheck.check()) { + rolling(); + } + + cleanExpiredBuckets(); + // @@@ Testing + // LOG.info("Raw Data:" + buckets + ",unflushed:" + unflushed); + + Collection<V> values = buckets.values(); + + V ret = merger.merge(values, unflushed, this); + if (ret == null) { + + // @@@ testing + // LOG.warn("!!!!Exist null data !!!!!"); + return defaultValue; + } + return ret; + } + + /* + * Move the "current bucket time" index and clean the expired buckets + */ + protected void rolling() { + synchronized (this) { + if (unflushed != null) { + buckets.put(currBucketTime, unflushed); + unflushed = null; + } + + currBucketTime = getCurrBucketTime(); + + return; + } + } + + protected void cleanExpiredBuckets() { + int nowSec = TimeUtils.current_time_secs(); + int startRemove = nowSec - (interval - 1) - windowSecond; + + List<Integer> removeList = new ArrayList<Integer>(); + + for (Integer keyTime : buckets.keySet()) { + if (keyTime < startRemove) { + removeList.add(keyTime); + } else if (keyTime >= startRemove) { + break; + } + } + + for (Integer removeKey : removeList) { + buckets.remove(removeKey); + // @@@ Testing + // LOG.info("Remove key:" + removeKey + ", diff:" + (nowSec - removeKey)); + + } + + if (buckets.isEmpty() == false) { + Integer first = buckets.firstKey(); + startTime = first.longValue() * 1000; + } + } + + public int getWindowSecond() { + return windowSecond; + } + + public long getStartTime() { + return startTime; + } + + public int getInterval() { + return interval; + } + + public Integer getBucketNum() { + return bucketNum; + } + + public V getDefaultValue() { + return defaultValue; + } + + private Integer getCurrBucketTime() { + return (TimeUtils.current_time_secs() / interval) * interval; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/StatBuckets.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/StatBuckets.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/StatBuckets.java new file mode 100644 index 0000000..30f5c64 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/StatBuckets.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.window; + +import com.google.common.base.Joiner; + +import java.util.*; + +public class StatBuckets { + + public static final Integer NUM_STAT_BUCKETS = 20; + + public static final Integer MINUTE_WINDOW = 600; + public static final Integer HOUR_WINDOW = 10800; + public static final Integer DAY_WINDOW = 86400; + public static final Integer ALL_TIME_WINDOW = 0; + public static Set<Integer> TIME_WINDOWS = new TreeSet<Integer>(); + static { + TIME_WINDOWS.add(ALL_TIME_WINDOW); + TIME_WINDOWS.add(MINUTE_WINDOW); + TIME_WINDOWS.add(HOUR_WINDOW); + TIME_WINDOWS.add(DAY_WINDOW); + } + + public static final String MINUTE_WINDOW_STR = "0d0h10m0s"; + public static final String HOUR_WINDOW_STR = "0d3h0m0s"; + public static final String DAY_WINDOW_STR = "1d0h0m0s"; + public static final String ALL_WINDOW_STR = "All-time"; + + public static Integer[] STAT_BUCKETS = { MINUTE_WINDOW / NUM_STAT_BUCKETS, HOUR_WINDOW / NUM_STAT_BUCKETS, DAY_WINDOW / NUM_STAT_BUCKETS }; + + private static final String[][] PRETTYSECDIVIDERS = { new String[] { "s", "60" }, new String[] { "m", "60" }, new String[] { "h", "24" }, + new String[] { "d", null } }; + + /** + * Service b + * + * @param key + * @return + */ + public static String parseTimeKey(Integer key) { + if (key == 0) { + return ALL_WINDOW_STR; + } else { + return String.valueOf(key); + } + } + + /** + * + * Default is the latest result + * + * @param showKey + * @return + */ + public static Integer getTimeKey(String showKey) { + Integer window = null; + if (showKey == null) { + window = (MINUTE_WINDOW); + } else if (showKey.equals(MINUTE_WINDOW_STR)) { + window = (MINUTE_WINDOW); + } else if (showKey.equals(HOUR_WINDOW_STR)) { + window = (HOUR_WINDOW); + } else if (showKey.equals(DAY_WINDOW_STR)) { + window = (DAY_WINDOW); + } else if (showKey.equals(ALL_WINDOW_STR)) { + window = ALL_TIME_WINDOW; + } else { + window = MINUTE_WINDOW; + } + + return window; + } + + /** + * Default is the latest result + * + * @param showStr + * @return + */ + public static String getShowTimeStr(Integer time) { + if (time == null) { + return MINUTE_WINDOW_STR; + } else if (time.equals(MINUTE_WINDOW)) { + return MINUTE_WINDOW_STR; + } else if (time.equals(HOUR_WINDOW)) { + return HOUR_WINDOW_STR; + } else if (time.equals(DAY_WINDOW)) { + return DAY_WINDOW_STR; + } else if (time.equals(ALL_TIME_WINDOW)) { + return ALL_WINDOW_STR; + } else { + return MINUTE_WINDOW_STR; + } + + } + + /** + * seconds to string like 1d20h30m40s + * + * @param secs + * @return + */ + public static String prettyUptimeStr(int secs) { + int diversize = PRETTYSECDIVIDERS.length; + + List<String> tmp = new ArrayList<String>(); + int div = secs; + for (int i = 0; i < diversize; i++) { + if (PRETTYSECDIVIDERS[i][1] != null) { + Integer d = Integer.parseInt(PRETTYSECDIVIDERS[i][1]); + tmp.add(div % d + PRETTYSECDIVIDERS[i][0]); + div = div / d; + } else { + tmp.add(div + PRETTYSECDIVIDERS[i][0]); + } + } + + String rtn = ""; + int tmpSzie = tmp.size(); + for (int j = tmpSzie - 1; j > -1; j--) { + rtn += tmp.get(j); + } + return rtn; + } + + /** + * seconds to string like '30m 40s' and '1d 20h 30m 40s' + * + * @param secs + * @return + */ + public static String prettyUptime(int secs) { + int diversize = PRETTYSECDIVIDERS.length; + + LinkedList<String> tmp = new LinkedList<>(); + int div = secs; + for (int i = 0; i < diversize; i++) { + if (PRETTYSECDIVIDERS[i][1] != null) { + Integer d = Integer.parseInt(PRETTYSECDIVIDERS[i][1]); + tmp.addFirst(div % d + PRETTYSECDIVIDERS[i][0]); + div = div / d; + } else { + tmp.addFirst(div + PRETTYSECDIVIDERS[i][0]); + } + if (div <= 0 ) break; + } + + Joiner joiner = Joiner.on(" "); + return joiner.join(tmp); + } + + /** + * @param args + */ + public static void main(String[] args) { + // TODO Auto-generated method stub + + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/Sampling.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/Sampling.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/Sampling.java deleted file mode 100755 index 3d32cc9..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/Sampling.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.operator; - -import java.io.Serializable; - -public interface Sampling<V> extends Serializable { - - /** - * Update object into Metric - * - * @param obj - */ - void update(Number obj); - - /** - * - * Get snapshot of Metric - * - * @return - */ - V getSnapshot(); -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/StartTime.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/StartTime.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/StartTime.java deleted file mode 100755 index 0b6173f..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/StartTime.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.operator; - -public interface StartTime { - long getStartTime(); -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/AtomicLongToLong.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/AtomicLongToLong.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/AtomicLongToLong.java deleted file mode 100755 index 8f142f1..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/AtomicLongToLong.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.operator.convert; - -import java.util.concurrent.atomic.AtomicLong; - -public class AtomicLongToLong implements Convertor<AtomicLong, Long> { - private static final long serialVersionUID = -2755066621494409063L; - - @Override - public Long convert(AtomicLong obj) { - // TODO Auto-generated method stub - if (obj == null) { - return null; - } else { - return obj.get(); - } - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/Convertor.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/Convertor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/Convertor.java deleted file mode 100755 index 73cdceb..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/Convertor.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.operator.convert; - -import java.io.Serializable; - -public interface Convertor<From, To> extends Serializable { - - To convert(From obj); -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/DefaultConvertor.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/DefaultConvertor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/DefaultConvertor.java deleted file mode 100755 index 47065d0..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/DefaultConvertor.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.operator.convert; - -public class DefaultConvertor<T> implements Convertor<T, T> { - private static final long serialVersionUID = -647209923903679727L; - - @Override - public T convert(T obj) { - // TODO Auto-generated method stub - return obj; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/SetToList.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/SetToList.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/SetToList.java deleted file mode 100755 index 4891222..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/SetToList.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.operator.convert; - -import java.util.ArrayList; -import java.util.List; -import java.util.Set; - -public class SetToList<T> implements Convertor<Set<T>, List<T>> { - private static final long serialVersionUID = 4968816655779625255L; - - @Override - public List<T> convert(Set<T> set) { - // TODO Auto-generated method stub - List<T> ret = new ArrayList<T>(); - if (set != null) { - for (T item : set) { - ret.add(item); - } - } - return ret; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java deleted file mode 100755 index 3ad94f2..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.operator.merger; - -import java.util.Collection; - -import com.alibaba.jstorm.common.metric.Histogram; -import com.alibaba.jstorm.utils.Pair; - -public class AvgMerger implements Merger<Histogram.HistorgramPair> { - private static final long serialVersionUID = -3892281208959055221L; - - @Override - public Histogram.HistorgramPair merge( - Collection<Histogram.HistorgramPair> objs, - Histogram.HistorgramPair unflushed, Object... others) { - // TODO Auto-generated method stub - double sum = 0.0d; - long times = 0l; - - if (unflushed != null) { - sum = sum + unflushed.getSum(); - times = times + unflushed.getTimes(); - } - - for (Histogram.HistorgramPair item : objs) { - if (item == null) { - continue; - } - sum = sum + item.getSum(); - times = times + item.getTimes(); - } - - return new Histogram.HistorgramPair(sum, times); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java.bak ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java.bak b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java.bak deleted file mode 100755 index 6f82888..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java.bak +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.operator.merger; - -import java.util.Collection; -import java.util.concurrent.atomic.AtomicLong; - -import com.alibaba.jstorm.utils.Pair; -import com.google.common.util.concurrent.AtomicDouble; - -public class AvgMerger2 implements Merger<Pair<AtomicDouble, AtomicLong>> { - private static final long serialVersionUID = -3892281208959055221L; - - @Override - public Pair<AtomicDouble, AtomicLong> merge( - Collection<Pair<AtomicDouble, AtomicLong>> objs, - Pair<AtomicDouble, AtomicLong> unflushed, Object... others) { - // TODO Auto-generated method stub - AtomicDouble sum = new AtomicDouble(0.0); - AtomicLong times = new AtomicLong(0); - - if (unflushed != null) { - sum.addAndGet(unflushed.getFirst().get()); - times.addAndGet(unflushed.getSecond().get()); - } - - for (Pair<AtomicDouble, AtomicLong> item : objs) { - if (item == null) { - continue; - } - sum.addAndGet(item.getFirst().get()); - times.addAndGet(item.getSecond().get()); - } - - return new Pair<AtomicDouble, AtomicLong>(sum, times); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/LongSumMerger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/LongSumMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/LongSumMerger.java deleted file mode 100755 index 30ded34..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/LongSumMerger.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.operator.merger; - -import java.util.Collection; -import java.util.concurrent.atomic.AtomicLong; - -public class LongSumMerger implements Merger<AtomicLong> { - private static final long serialVersionUID = -3500779273677666691L; - - @Override - public AtomicLong merge(Collection<AtomicLong> objs, AtomicLong unflushed, - Object... others) { - AtomicLong ret = new AtomicLong(0); - if (unflushed != null) { - ret.addAndGet(unflushed.get()); - } - - for (AtomicLong item : objs) { - if (item == null) { - continue; - } - ret.addAndGet(item.get()); - } - return ret; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/Merger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/Merger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/Merger.java deleted file mode 100755 index 0483458..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/Merger.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.operator.merger; - -import java.io.Serializable; -import java.util.Collection; - -public interface Merger<V> extends Serializable { - V merge(Collection<V> objs, V unflushed, Object... others); -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/SumMerger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/SumMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/SumMerger.java deleted file mode 100755 index ead3c53..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/SumMerger.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.operator.merger; - -import java.util.Collection; - -import com.alibaba.jstorm.utils.JStormUtils; - -public class SumMerger<T extends Number> implements Merger<T> { - private static final long serialVersionUID = -7026523452570138433L; - - @SuppressWarnings("unchecked") - @Override - public T merge(Collection<T> objs, T unflushed, Object... others) { - // TODO Auto-generated method stub - T ret = unflushed; - for (T obj : objs) { - ret = (T) JStormUtils.add(ret, obj); - } - - return ret; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/TpsMerger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/TpsMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/TpsMerger.java deleted file mode 100755 index 859f642..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/TpsMerger.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.operator.merger; - -import java.util.Collection; - -import com.alibaba.jstorm.common.metric.operator.StartTime; - -public class TpsMerger implements Merger<Double> { - private static final long serialVersionUID = -4534840881635955942L; - protected final long createTime; - - public TpsMerger() { - createTime = System.currentTimeMillis(); - } - - public long getRunMillis(Object... args) { - long startTime = createTime; - - if (args != null) { - if (args[0] != null && args[0] instanceof StartTime) { - StartTime rollingWindow = (StartTime) args[0]; - - startTime = rollingWindow.getStartTime(); - } - } - - return (System.currentTimeMillis() - startTime); - } - - @Override - public Double merge(Collection<Double> objs, Double unflushed, - Object... others) { - // TODO Auto-generated method stub - double sum = 0.0d; - if (unflushed != null) { - sum += unflushed; - } - - for (Double item : objs) { - if (item != null) { - sum += item; - } - } - - Double ret = (sum * 1000) / getRunMillis(others); - return ret; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AddUpdater.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AddUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AddUpdater.java deleted file mode 100755 index 4fdf813..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AddUpdater.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.operator.updater; - -import com.alibaba.jstorm.utils.JStormUtils; - -public class AddUpdater<T extends Number> implements Updater<T> { - private static final long serialVersionUID = -7955740095421752763L; - - @SuppressWarnings("unchecked") - @Override - public T update(Number object, T cache, Object... others) { - // TODO Auto-generated method stub - return (T) JStormUtils.add(cache, object); - } - - @Override - public T updateBatch(T object, T cache, Object... objects) { - // TODO Auto-generated method stub - return (T) JStormUtils.add(cache, object); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java deleted file mode 100755 index 30ae46c..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.operator.updater; - -import com.alibaba.jstorm.common.metric.Histogram; - -public class AvgUpdater implements Updater<Histogram.HistorgramPair> { - private static final long serialVersionUID = 2562836921724586449L; - - @Override - public Histogram.HistorgramPair update(Number object, - Histogram.HistorgramPair cache, Object... others) { - // TODO Auto-generated method stub - if (object == null) { - return cache; - } - if (cache == null) { - cache = - new Histogram.HistorgramPair(); - } - - cache.addValue(object.doubleValue()); - cache.addTimes(1l); - - return cache; - } - - @Override - public Histogram.HistorgramPair updateBatch( - Histogram.HistorgramPair object, - Histogram.HistorgramPair cache, Object... objects) { - // TODO Auto-generated method stub - if (object == null) { - return cache; - } - if (cache == null) { - cache = - new Histogram.HistorgramPair(); - } - - cache.addValue(object.getSum()); - cache.addTimes(object.getTimes()); - - return cache; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java.bak ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java.bak b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java.bak deleted file mode 100755 index 44cc70d..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java.bak +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.operator.updater; - -import java.util.concurrent.atomic.AtomicLong; - -import com.alibaba.jstorm.utils.Pair; -import com.google.common.util.concurrent.AtomicDouble; - -public class AvgUpdater2 implements Updater<Pair<AtomicDouble, AtomicLong>> { - private static final long serialVersionUID = 2562836921724586449L; - - @Override - public Pair<AtomicDouble, AtomicLong> update(Number object, - Pair<AtomicDouble, AtomicLong> cache, Object... others) { - // TODO Auto-generated method stub - if (object == null) { - return cache; - } - if (cache == null) { - cache = - new Pair<AtomicDouble, AtomicLong>(new AtomicDouble(0.0), - new AtomicLong(0)); - } - - AtomicDouble sum = cache.getFirst(); - AtomicLong times = cache.getSecond(); - - sum.addAndGet(object.doubleValue()); - times.incrementAndGet(); - - return cache; - } - - @Override - public Pair<AtomicDouble, AtomicLong> updateBatch( - Pair<AtomicDouble, AtomicLong> object, - Pair<AtomicDouble, AtomicLong> cache, Object... objects) { - // TODO Auto-generated method stub - if (object == null) { - return cache; - } - if (cache == null) { - cache = - new Pair<AtomicDouble, AtomicLong>(new AtomicDouble(0.0), - new AtomicLong(0)); - } - - AtomicDouble sum = cache.getFirst(); - AtomicLong times = cache.getSecond(); - - sum.addAndGet(object.getFirst().get()); - times.addAndGet(object.getSecond().get()); - - return cache; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/DoubleAddUpdater.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/DoubleAddUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/DoubleAddUpdater.java deleted file mode 100755 index e3b640a..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/DoubleAddUpdater.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.operator.updater; - -import com.google.common.util.concurrent.AtomicDouble; - -public class DoubleAddUpdater implements Updater<AtomicDouble> { - private static final long serialVersionUID = -1293565961076552462L; - - @Override - public AtomicDouble update(Number object, AtomicDouble cache, - Object... others) { - // TODO Auto-generated method stub - if (cache == null) { - cache = new AtomicDouble(0.0); - } - if (object != null) { - cache.addAndGet(object.doubleValue()); - } - return cache; - } - - @Override - public AtomicDouble updateBatch(AtomicDouble object, AtomicDouble cache, - Object... objects) { - // TODO Auto-generated method stub - return update(object, cache, objects); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/LongAddUpdater.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/LongAddUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/LongAddUpdater.java deleted file mode 100755 index 4986146..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/LongAddUpdater.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.operator.updater; - -import java.util.concurrent.atomic.AtomicLong; - -public class LongAddUpdater implements Updater<AtomicLong> { - private static final long serialVersionUID = -2185639264737912405L; - - @Override - public AtomicLong update(Number object, AtomicLong cache, Object... others) { - // TODO Auto-generated method stub - if (cache == null) { - cache = new AtomicLong(0); - } - - if (object != null) { - cache.addAndGet(object.longValue()); - } - return cache; - } - - @Override - public AtomicLong updateBatch(AtomicLong object, AtomicLong cache, - Object... objects) { - // TODO Auto-generated method stub - return update(object, cache, objects); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/Updater.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/Updater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/Updater.java deleted file mode 100755 index cb22c4c..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/Updater.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.operator.updater; - -import java.io.Serializable; - -public interface Updater<V> extends Serializable { - V update(Number object, V cache, Object... others); - V updateBatch(V object, V cache, Object... objects ); -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmCounterSnapshot.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmCounterSnapshot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmCounterSnapshot.java new file mode 100644 index 0000000..2f79141 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmCounterSnapshot.java @@ -0,0 +1,20 @@ +package com.alibaba.jstorm.common.metric.snapshot; + +/** + * @author wange + * @since 15/6/5 + */ +public class AsmCounterSnapshot extends AsmSnapshot { + private static final long serialVersionUID = -7574994037947802582L; + + private long v; + + public long getV() { + return v; + } + + public AsmSnapshot setValue(long value) { + this.v = value; + return this; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmGaugeSnapshot.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmGaugeSnapshot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmGaugeSnapshot.java new file mode 100644 index 0000000..221b5b1 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmGaugeSnapshot.java @@ -0,0 +1,20 @@ +package com.alibaba.jstorm.common.metric.snapshot; + +/** + * @author wange + * @since 15/6/5 + */ +public class AsmGaugeSnapshot extends AsmSnapshot { + private static final long serialVersionUID = 3216517772824794848L; + + private double v; + + public double getV() { + return v; + } + + public AsmSnapshot setValue(double value) { + this.v = value; + return this; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmHistogramSnapshot.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmHistogramSnapshot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmHistogramSnapshot.java new file mode 100644 index 0000000..51ac3f5 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmHistogramSnapshot.java @@ -0,0 +1,22 @@ +package com.alibaba.jstorm.common.metric.snapshot; + +import com.codahale.metrics.Snapshot; + +/** + * @author wange + * @since 15/6/5 + */ +public class AsmHistogramSnapshot extends AsmSnapshot { + private static final long serialVersionUID = 7284437562594156565L; + + private Snapshot snapshot; + + public Snapshot getSnapshot() { + return snapshot; + } + + public AsmSnapshot setSnapshot(Snapshot snapshot) { + this.snapshot = snapshot; + return this; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmMeterSnapshot.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmMeterSnapshot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmMeterSnapshot.java new file mode 100644 index 0000000..e255e6b --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmMeterSnapshot.java @@ -0,0 +1,50 @@ +package com.alibaba.jstorm.common.metric.snapshot; + +/** + * @author wange + * @since 15/6/5 + */ +public class AsmMeterSnapshot extends AsmSnapshot { + private static final long serialVersionUID = -1754325312045025810L; + + private double m1; + private double m5; + private double m15; + private double mean; + + public double getM1() { + return m1; + } + + public AsmMeterSnapshot setM1(double m1) { + this.m1 = m1; + return this; + } + + public double getM5() { + return m5; + } + + public AsmMeterSnapshot setM5(double m5) { + this.m5 = m5; + return this; + } + + public double getM15() { + return m15; + } + + public AsmMeterSnapshot setM15(double m15) { + this.m15 = m15; + return this; + } + + public double getMean() { + return mean; + } + + public AsmMeterSnapshot setMean(double mean) { + this.mean = mean; + return this; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmSnapshot.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmSnapshot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmSnapshot.java new file mode 100644 index 0000000..4c71fe9 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmSnapshot.java @@ -0,0 +1,32 @@ +package com.alibaba.jstorm.common.metric.snapshot; + +import java.io.Serializable; + +/** + * @author wange + * @since 15/6/5 + */ +public abstract class AsmSnapshot implements Serializable { + private static final long serialVersionUID = 1945719653840917619L; + + private long metricId; + private long ts; + + public long getTs() { + return ts; + } + + public AsmSnapshot setTs(long ts) { + this.ts = ts; + return this; + } + + public long getMetricId() { + return metricId; + } + + public AsmSnapshot setMetricId(long metricId) { + this.metricId = metricId; + return this; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmTimerSnapshot.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmTimerSnapshot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmTimerSnapshot.java new file mode 100644 index 0000000..6fec50c --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmTimerSnapshot.java @@ -0,0 +1,32 @@ +package com.alibaba.jstorm.common.metric.snapshot; + +import com.codahale.metrics.Snapshot; + +/** + * @author wange + * @since 15/6/5 + */ +public class AsmTimerSnapshot extends AsmSnapshot { + private static final long serialVersionUID = 7784062881728741781L; + + private Snapshot histogram; + private AsmMeterSnapshot meter; + + public Snapshot getHistogram() { + return histogram; + } + + public AsmTimerSnapshot setHistogram(Snapshot snapshot) { + this.histogram = snapshot; + return this; + } + + public AsmMeterSnapshot getMeter() { + return meter; + } + + public AsmTimerSnapshot setMeter(AsmMeterSnapshot meter) { + this.meter = meter; + return this; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/AllWindow.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/AllWindow.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/AllWindow.java deleted file mode 100755 index 8475e4c..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/AllWindow.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.window; - -import java.util.ArrayList; - -import com.alibaba.jstorm.common.metric.operator.Sampling; -import com.alibaba.jstorm.common.metric.operator.StartTime; -import com.alibaba.jstorm.common.metric.operator.merger.Merger; -import com.alibaba.jstorm.common.metric.operator.updater.Updater; - -public class AllWindow<V> implements Sampling<V>, StartTime { - - private static final long serialVersionUID = -8523514907315740812L; - - protected V unflushed; - protected V defaultValue; - - protected Updater<V> updater; - protected Merger<V> merger; - protected long startTime; - - AllWindow(V defaultValue, Updater<V> updater, Merger<V> merger) { - - this.updater = updater; - this.merger = merger; - - this.defaultValue = defaultValue; - this.startTime = System.currentTimeMillis(); - } - - @Override - public void update(Number obj) { - // TODO Auto-generated method stub - synchronized (this) { - unflushed = updater.update(obj, unflushed); - } - } - - public void updateBatch(V batch) { - synchronized (this) { - unflushed = updater.updateBatch(batch, unflushed); - } - } - - @Override - public V getSnapshot() { - // TODO Auto-generated method stub - V ret = merger.merge(new ArrayList<V>(), unflushed, this); - if (ret == null) { - return defaultValue; - } else { - return ret; - } - } - - @Override - public long getStartTime() { - // TODO Auto-generated method stub - return startTime; - } - -}
