http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/Metric.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/Metric.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/Metric.java deleted file mode 100755 index 63a725a..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/Metric.java +++ /dev/null @@ -1,231 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.window; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.jstorm.callback.Callback; -import com.alibaba.jstorm.common.metric.operator.Sampling; -import com.alibaba.jstorm.common.metric.operator.convert.Convertor; -import com.alibaba.jstorm.common.metric.operator.merger.Merger; -import com.alibaba.jstorm.common.metric.operator.updater.Updater; -import com.alibaba.jstorm.utils.IntervalCheck; - -public class Metric<T, V> implements Sampling<Map<Integer, T>> { - private static final long serialVersionUID = -1362345159511508074L; - private static final Logger LOG = LoggerFactory.getLogger(Metric.class); - - protected static boolean enable; - - public static void setEnable(boolean e) { - enable = e; - } - - protected List<RollingWindow<V>> rollingWindows; - protected AllWindow<V> allWindow; - - protected int[] windowSeconds = { StatBuckets.MINUTE_WINDOW, - StatBuckets.HOUR_WINDOW, StatBuckets.DAY_WINDOW }; - protected int bucketSize = StatBuckets.NUM_STAT_BUCKETS; - protected V defaultValue; - protected Updater<V> updater; - protected Merger<V> merger; - protected Convertor<V, T> convertor; - protected Callback callback; - - protected int interval; // unit is second - protected IntervalCheck intervalCheck; - protected V unflushed; - - public Metric() { - } - - public int getInterval() { - if (windowSeconds == null || windowSeconds.length == 0) { - return StatBuckets.NUM_STAT_BUCKETS; - } - - int intervals[] = new int[windowSeconds.length]; - int smallest = Integer.MAX_VALUE; - for (int i = 0; i < windowSeconds.length; i++) { - int interval = windowSeconds[i] / bucketSize; - intervals[i] = interval; - if (interval < smallest) { - smallest = interval; - } - } - - for (int goodInterval = smallest; goodInterval > 1; goodInterval--) { - boolean good = true; - for (int interval : intervals) { - if (interval % goodInterval != 0) { - good = false; - break; - } - } - - if (good == true) { - return goodInterval; - } - } - - return 1; - } - - public void init() { - if (defaultValue == null || updater == null || merger == null - || convertor == null) { - throw new IllegalArgumentException("Invalid argements"); - } - - rollingWindows = new ArrayList<RollingWindow<V>>(); - if (windowSeconds != null) { - rollingWindows.clear(); - for (int windowSize : windowSeconds) { - RollingWindow<V> rollingWindow = - new RollingWindow<V>(defaultValue, windowSize - / bucketSize, windowSize, updater, merger); - - rollingWindows.add(rollingWindow); - } - - } - allWindow = new AllWindow<V>(defaultValue, updater, merger); - - this.interval = getInterval(); - this.intervalCheck = new IntervalCheck(); - this.intervalCheck.setInterval(interval); - } - - /** - * In order to improve performance - * Do - */ - @Override - public void update(Number obj) { - if (enable == false) { - return; - } - - if (intervalCheck.check()) { - flush(); - } - synchronized (this) { - unflushed = updater.update(obj, unflushed); - } - } - - public synchronized void flush() { - if (unflushed == null) { - return; - } - for (RollingWindow<V> rollingWindow : rollingWindows) { - rollingWindow.updateBatch(unflushed); - } - allWindow.updateBatch(unflushed); - unflushed = null; - } - - @Override - public Map<Integer, T> getSnapshot() { - // TODO Auto-generated method stub - flush(); - - Map<Integer, T> ret = new TreeMap<Integer, T>(); - for (RollingWindow<V> rollingWindow : rollingWindows) { - V value = rollingWindow.getSnapshot(); - - ret.put(rollingWindow.getWindowSecond(), convertor.convert(value)); - } - - ret.put(StatBuckets.ALL_TIME_WINDOW, - convertor.convert(allWindow.getSnapshot())); - - if (callback != null) { - callback.execute(this); - } - return ret; - } - - public T getAllTimeValue() { - return convertor.convert(allWindow.getSnapshot()); - } - - public int[] getWindowSeconds() { - return windowSeconds; - } - - public void setWindowSeconds(int[] windowSeconds) { - this.windowSeconds = windowSeconds; - } - - public int getBucketSize() { - return bucketSize; - } - - public void setBucketSize(int bucketSize) { - this.bucketSize = bucketSize; - } - - public V getDefaultValue() { - return defaultValue; - } - - public void setDefaultValue(V defaultValue) { - this.defaultValue = defaultValue; - } - - public Updater<V> getUpdater() { - return updater; - } - - public void setUpdater(Updater<V> updater) { - this.updater = updater; - } - - public Merger<V> getMerger() { - return merger; - } - - public void setMerger(Merger<V> merger) { - this.merger = merger; - } - - public Convertor<V, T> getConvertor() { - return convertor; - } - - public void setConvertor(Convertor<V, T> convertor) { - this.convertor = convertor; - } - - public Callback getCallback() { - return callback; - } - - public void setCallback(Callback callback) { - this.callback = callback; - } - -}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/RollingWindow.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/RollingWindow.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/RollingWindow.java deleted file mode 100755 index 54047a6..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/RollingWindow.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.window; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.TreeMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.jstorm.common.metric.operator.Sampling; -import com.alibaba.jstorm.common.metric.operator.StartTime; -import com.alibaba.jstorm.common.metric.operator.merger.Merger; -import com.alibaba.jstorm.common.metric.operator.updater.Updater; -import com.alibaba.jstorm.utils.IntervalCheck; -import com.alibaba.jstorm.utils.TimeUtils; - -public class RollingWindow<V> implements Sampling<V>, StartTime { - private static final long serialVersionUID = 3794478417380003279L; - private static final Logger LOG = LoggerFactory - .getLogger(RollingWindow.class); - - protected long startTime; - protected Integer currBucketTime; - protected int interval; // unit is second - protected int windowSecond; - protected IntervalCheck intervalCheck; - - protected TreeMap<Integer, V> buckets; - protected Integer bucketNum; - protected V unflushed; - protected V defaultValue; - - protected Updater<V> updater; - protected Merger<V> merger; - - RollingWindow(V defaultValue, int interval, int windowSecond, - Updater<V> updater, Merger<V> merger) { - this.startTime = System.currentTimeMillis(); - this.interval = interval; - this.intervalCheck = new IntervalCheck(); - this.intervalCheck.setInterval(interval); - this.currBucketTime = getCurrBucketTime(); - - this.bucketNum = windowSecond / interval; - this.windowSecond = (bucketNum) * interval; - - this.buckets = new TreeMap<Integer, V>(); - - this.updater = updater; - this.merger = merger; - - this.defaultValue = defaultValue; - - } - - - @Override - public void update(Number obj) { - // TODO Auto-generated method stub - - if (intervalCheck.check()) { - rolling(); - } - synchronized (this) { - unflushed = updater.update(obj, unflushed); - - } - - } - - /** - * In order to improve performance - * Flush one batch to rollingWindow - * - */ - public void updateBatch(V batch) { - - if (intervalCheck.check()) { - rolling(); - } - synchronized (this) { - unflushed = updater.updateBatch(batch, unflushed); - } - - } - - @Override - public V getSnapshot() { - // TODO Auto-generated method stub - if (intervalCheck.check()) { - rolling(); - } - - cleanExpiredBuckets(); - // @@@ Testing - //LOG.info("Raw Data:" + buckets + ",unflushed:" + unflushed); - - Collection<V> values = buckets.values(); - - V ret = merger.merge(values, unflushed, this); - if (ret == null) { - - // @@@ testing - //LOG.warn("!!!!Exist null data !!!!!"); - return defaultValue; - } - return ret; - } - - /* - * Move the "current bucket time" index and clean the expired buckets - */ - protected void rolling() { - synchronized (this) { - if (unflushed != null) { - buckets.put(currBucketTime, unflushed); - unflushed = null; - } - - currBucketTime = getCurrBucketTime(); - - return ; - } - } - - protected void cleanExpiredBuckets() { - int nowSec = TimeUtils.current_time_secs(); - int startRemove = nowSec - (interval - 1) - windowSecond; - - List<Integer> removeList = new ArrayList<Integer>(); - - for (Integer keyTime : buckets.keySet()) { - if (keyTime < startRemove) { - removeList.add(keyTime); - } else if (keyTime >= startRemove) { - break; - } - } - - for (Integer removeKey : removeList) { - buckets.remove(removeKey); - // @@@ Testing - //LOG.info("Remove key:" + removeKey + ", diff:" + (nowSec - removeKey)); - - } - - if (buckets.isEmpty() == false) { - Integer first = buckets.firstKey(); - startTime = first.longValue() * 1000; - } - } - - public int getWindowSecond() { - return windowSecond; - } - - public long getStartTime() { - return startTime; - } - - public int getInterval() { - return interval; - } - - public Integer getBucketNum() { - return bucketNum; - } - - public V getDefaultValue() { - return defaultValue; - } - - private Integer getCurrBucketTime() { - return (TimeUtils.current_time_secs() / interval) * interval; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/StatBuckets.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/StatBuckets.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/StatBuckets.java deleted file mode 100755 index 3e9b021..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/StatBuckets.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric.window; - -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - -public class StatBuckets { - - public static final Integer NUM_STAT_BUCKETS = 20; - - public static final Integer MINUTE_WINDOW = 600; - public static final Integer HOUR_WINDOW = 10800; - public static final Integer DAY_WINDOW = 86400; - public static final Integer ALL_TIME_WINDOW = 0; - public static Set<Integer> TIME_WINDOWS = new TreeSet<Integer>(); - static { - TIME_WINDOWS.add(ALL_TIME_WINDOW); - TIME_WINDOWS.add(MINUTE_WINDOW); - TIME_WINDOWS.add(HOUR_WINDOW); - TIME_WINDOWS.add(DAY_WINDOW); - } - - public static final String MINUTE_WINDOW_STR = "0d0h10m0s"; - public static final String HOUR_WINDOW_STR = "0d3h0m0s"; - public static final String DAY_WINDOW_STR = "1d0h0m0s"; - public static final String ALL_WINDOW_STR = "All-time"; - - public static Integer[] STAT_BUCKETS = { MINUTE_WINDOW / NUM_STAT_BUCKETS, - HOUR_WINDOW / NUM_STAT_BUCKETS, DAY_WINDOW / NUM_STAT_BUCKETS }; - - private static final String[][] PRETTYSECDIVIDERS = { - new String[] { "s", "60" }, new String[] { "m", "60" }, - new String[] { "h", "24" }, new String[] { "d", null } }; - - /** - * Service b - * - * @param key - * @return - */ - public static String parseTimeKey(Integer key) { - if (key == 0) { - return ALL_WINDOW_STR; - } else { - return String.valueOf(key); - } - } - - /** - * - * Default is the latest result - * - * @param showKey - * @return - */ - public static Integer getTimeKey(String showKey) { - Integer window = null; - if (showKey == null) { - window = (MINUTE_WINDOW); - } else if (showKey.equals(MINUTE_WINDOW_STR)) { - window = (MINUTE_WINDOW); - } else if (showKey.equals(HOUR_WINDOW_STR)) { - window = (HOUR_WINDOW); - } else if (showKey.equals(DAY_WINDOW_STR)) { - window = (DAY_WINDOW); - } else if (showKey.equals(ALL_WINDOW_STR)) { - window = ALL_TIME_WINDOW; - } else { - window = MINUTE_WINDOW; - } - - return window; - } - - /** - * Default is the latest result - * - * @param showStr - * @return - */ - public static String getShowTimeStr(Integer time) { - if (time == null) { - return MINUTE_WINDOW_STR; - } else if (time.equals(MINUTE_WINDOW)) { - return MINUTE_WINDOW_STR; - } else if (time.equals(HOUR_WINDOW)) { - return HOUR_WINDOW_STR; - } else if (time.equals(DAY_WINDOW)) { - return DAY_WINDOW_STR; - } else if (time.equals(ALL_TIME_WINDOW)) { - return ALL_WINDOW_STR; - } else { - return MINUTE_WINDOW_STR; - } - - } - - /** - * seconds to string like 1d20h30m40s - * - * @param secs - * @return - */ - public static String prettyUptimeStr(int secs) { - int diversize = PRETTYSECDIVIDERS.length; - - List<String> tmp = new ArrayList<String>(); - int div = secs; - for (int i = 0; i < diversize; i++) { - if (PRETTYSECDIVIDERS[i][1] != null) { - Integer d = Integer.parseInt(PRETTYSECDIVIDERS[i][1]); - tmp.add(div % d + PRETTYSECDIVIDERS[i][0]); - div = div / d; - } else { - tmp.add(div + PRETTYSECDIVIDERS[i][0]); - } - } - - String rtn = ""; - int tmpSzie = tmp.size(); - for (int j = tmpSzie - 1; j > -1; j--) { - rtn += tmp.get(j); - } - return rtn; - } - - /** - * @param args - */ - public static void main(String[] args) { - // TODO Auto-generated method stub - - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/stats/StaticsType.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/stats/StaticsType.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/stats/StaticsType.java deleted file mode 100755 index 2dbab6f..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/stats/StaticsType.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.stats; - -public enum StaticsType { - emitted, send_tps, recv_tps, acked, failed, transferred, process_latencies; -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupCenter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupCenter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupCenter.java index d9148db..15b1cfa 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupCenter.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupCenter.java @@ -100,9 +100,8 @@ public class CgroupCenter implements CgroupOperation { SubSystemType type = SubSystemType.getSubSystem(split[0]); if (type == null) continue; - subSystems.add(new SubSystem(type, Integer.valueOf(split[1]), - Integer.valueOf(split[2]), Integer.valueOf(split[3]) - .intValue() == 1 ? true : false)); + subSystems.add(new SubSystem(type, Integer.valueOf(split[1]), Integer.valueOf(split[2]), Integer.valueOf(split[3]).intValue() == 1 ? true + : false)); } return subSystems; } catch (Exception e) { @@ -168,8 +167,7 @@ public class CgroupCenter implements CgroupOperation { if (!CgroupUtils.dirExists(hierarchy.getDir())) new File(hierarchy.getDir()).mkdirs(); String subSystems = CgroupUtils.reAnalyse(subsystems); - SystemOperation.mount(subSystems, hierarchy.getDir(), "cgroup", - subSystems); + SystemOperation.mount(subSystems, hierarchy.getDir(), "cgroup", subSystems); } @@ -217,8 +215,7 @@ public class CgroupCenter implements CgroupOperation { } public static void main(String args[]) { - System.out.println(CgroupCenter.getInstance().getHierarchies().get(0) - .getRootCgroups().getChildren().size()); + System.out.println(CgroupCenter.getInstance().getHierarchies().get(0).getRootCgroups().getChildren().size()); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupUtils.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupUtils.java index 4de2d5a..0cc45cc 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupUtils.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupUtils.java @@ -82,8 +82,7 @@ public class CgroupUtils { return CgroupUtils.fileExists(Constants.CGROUP_STATUS_FILE); } - public static List<String> readFileByLine(String fileDir) - throws IOException { + public static List<String> readFileByLine(String fileDir) throws IOException { List<String> result = new ArrayList<String>(); FileReader fileReader = null; BufferedReader reader = null; @@ -101,8 +100,7 @@ public class CgroupUtils { return result; } - public static void writeFileByLine(String fileDir, List<String> strings) - throws IOException { + public static void writeFileByLine(String fileDir, List<String> strings) throws IOException { FileWriter writer = null; BufferedWriter bw = null; try { @@ -123,8 +121,7 @@ public class CgroupUtils { } } - public static void writeFileByLine(String fileDir, String string) - throws IOException { + public static void writeFileByLine(String fileDir, String string) throws IOException { FileWriter writer = null; BufferedWriter bw = null; try { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/SubSystem.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/SubSystem.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/SubSystem.java index 1655e49..20d4ec0 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/SubSystem.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/SubSystem.java @@ -27,8 +27,7 @@ public class SubSystem { private boolean enable; - public SubSystem(SubSystemType type, int hierarchyID, int cgroupNum, - boolean enable) { + public SubSystem(SubSystemType type, int hierarchyID, int cgroupNum, boolean enable) { this.type = type; this.hierarchyID = hierarchyID; this.cgroupsNum = cgroupNum; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommon.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommon.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommon.java index 0a772f6..224d05d 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommon.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommon.java @@ -59,9 +59,7 @@ public class CgroupCommon implements CgroupCommonOperation { this.parent = parent; this.dir = parent.getDir() + "/" + name; this.init(); - cores = - CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), - this.dir); + cores = CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), this.dir); this.isRoot = false; } @@ -74,23 +72,19 @@ public class CgroupCommon implements CgroupCommonOperation { this.parent = null; this.dir = dir; this.init(); - cores = - CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), - this.dir); + cores = CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), this.dir); this.isRoot = true; } @Override public void addTask(int taskId) throws IOException { // TODO Auto-generated method stub - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, TASKS), - String.valueOf(taskId)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, TASKS), String.valueOf(taskId)); } @Override public Set<Integer> getTasks() throws IOException { - List<String> stringTasks = - CgroupUtils.readFileByLine(Constants.getDir(this.dir, TASKS)); + List<String> stringTasks = CgroupUtils.readFileByLine(Constants.getDir(this.dir, TASKS)); Set<Integer> tasks = new HashSet<Integer>(); for (String task : stringTasks) { tasks.add(Integer.valueOf(task)); @@ -101,16 +95,13 @@ public class CgroupCommon implements CgroupCommonOperation { @Override public void addProcs(int pid) throws IOException { // TODO Auto-generated method stub - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_PROCS), - String.valueOf(pid)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_PROCS), String.valueOf(pid)); } @Override public Set<Integer> getPids() throws IOException { // TODO Auto-generated method stub - List<String> stringPids = - CgroupUtils.readFileByLine(Constants.getDir(this.dir, - CGROUP_PROCS)); + List<String> stringPids = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_PROCS)); Set<Integer> pids = new HashSet<Integer>(); for (String task : stringPids) { pids.add(Integer.valueOf(task)); @@ -121,16 +112,12 @@ public class CgroupCommon implements CgroupCommonOperation { @Override public void setNotifyOnRelease(boolean flag) throws IOException { // TODO Auto-generated method stub - CgroupUtils - .writeFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE), - flag ? "1" : "0"); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE), flag ? "1" : "0"); } @Override public boolean getNotifyOnRelease() throws IOException { - return CgroupUtils - .readFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE)) - .get(0).equals("1") ? true : false; + return CgroupUtils.readFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE)).get(0).equals("1") ? true : false; } @Override @@ -138,16 +125,14 @@ public class CgroupCommon implements CgroupCommonOperation { // TODO Auto-generated method stub if (!this.isRoot) return; - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, RELEASE_AGENT), - command); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, RELEASE_AGENT), command); } @Override public String getReleaseAgent() throws IOException { if (!this.isRoot) return null; - return CgroupUtils.readFileByLine( - Constants.getDir(this.dir, RELEASE_AGENT)).get(0); + return CgroupUtils.readFileByLine(Constants.getDir(this.dir, RELEASE_AGENT)).get(0); } @Override @@ -155,21 +140,16 @@ public class CgroupCommon implements CgroupCommonOperation { // TODO Auto-generated method stub if (!this.cores.keySet().contains(SubSystemType.cpuset)) return; - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, - CGROUP_CLONE_CHILDREN), flag ? "1" : "0"); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN), flag ? "1" : "0"); } @Override public boolean getCgroupCloneChildren() throws IOException { - return CgroupUtils - .readFileByLine( - Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN)) - .get(0).equals("1") ? true : false; + return CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN)).get(0).equals("1") ? true : false; } @Override - public void setEventControl(String eventFd, String controlFd, - String... args) throws IOException { + public void setEventControl(String eventFd, String controlFd, String... args) throws IOException { // TODO Auto-generated method stub StringBuilder sb = new StringBuilder(); sb.append(eventFd); @@ -179,10 +159,7 @@ public class CgroupCommon implements CgroupCommonOperation { sb.append(' '); sb.append(arg); } - CgroupUtils - .writeFileByLine( - Constants.getDir(this.dir, CGROUP_EVENT_CONTROL), - sb.toString()); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_EVENT_CONTROL), sb.toString()); } public Hierarchy getHierarchy() { @@ -240,8 +217,7 @@ public class CgroupCommon implements CgroupCommonOperation { return; for (File child : files) { if (child.isDirectory()) { - this.children.add(new CgroupCommon(child.getName(), - this.hierarchy, this)); + this.children.add(new CgroupCommon(child.getName(), this.hierarchy, this)); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommonOperation.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommonOperation.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommonOperation.java index 3f9090f..a76b09c 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommonOperation.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommonOperation.java @@ -42,7 +42,6 @@ public interface CgroupCommonOperation { public boolean getCgroupCloneChildren() throws IOException; - public void setEventControl(String eventFd, String controlFd, - String... args) throws IOException; + public void setEventControl(String eventFd, String controlFd, String... args) throws IOException; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCoreFactory.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCoreFactory.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCoreFactory.java index 279366a..2b3f3a8 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCoreFactory.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCoreFactory.java @@ -35,10 +35,8 @@ import com.alibaba.jstorm.container.cgroup.core.NetPrioCore; public class CgroupCoreFactory { - public static Map<SubSystemType, CgroupCore> getInstance( - Set<SubSystemType> types, String dir) { - Map<SubSystemType, CgroupCore> result = - new HashMap<SubSystemType, CgroupCore>(); + public static Map<SubSystemType, CgroupCore> getInstance(Set<SubSystemType> types, String dir) { + Map<SubSystemType, CgroupCore> result = new HashMap<SubSystemType, CgroupCore>(); for (SubSystemType type : types) { switch (type) { case blkio: http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/BlkioCore.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/BlkioCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/BlkioCore.java index 5d487ec..9958114 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/BlkioCore.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/BlkioCore.java @@ -33,25 +33,18 @@ public class BlkioCore implements CgroupCore { public static final String BLKIO_WEIGHT_DEVICE = "/blkio.weight_device"; public static final String BLKIO_RESET_STATS = "/blkio.reset_stats"; - public static final String BLKIO_THROTTLE_READ_BPS_DEVICE = - "/blkio.throttle.read_bps_device"; - public static final String BLKIO_THROTTLE_WRITE_BPS_DEVICE = - "/blkio.throttle.write_bps_device"; - public static final String BLKIO_THROTTLE_READ_IOPS_DEVICE = - "/blkio.throttle.read_iops_device"; - public static final String BLKIO_THROTTLE_WRITE_IOPS_DEVICE = - "/blkio.throttle.write_iops_device"; - - public static final String BLKIO_THROTTLE_IO_SERVICED = - "/blkio.throttle.io_serviced"; - public static final String BLKIO_THROTTLE_IO_SERVICE_BYTES = - "/blkio.throttle.io_service_bytes"; + public static final String BLKIO_THROTTLE_READ_BPS_DEVICE = "/blkio.throttle.read_bps_device"; + public static final String BLKIO_THROTTLE_WRITE_BPS_DEVICE = "/blkio.throttle.write_bps_device"; + public static final String BLKIO_THROTTLE_READ_IOPS_DEVICE = "/blkio.throttle.read_iops_device"; + public static final String BLKIO_THROTTLE_WRITE_IOPS_DEVICE = "/blkio.throttle.write_iops_device"; + + public static final String BLKIO_THROTTLE_IO_SERVICED = "/blkio.throttle.io_serviced"; + public static final String BLKIO_THROTTLE_IO_SERVICE_BYTES = "/blkio.throttle.io_service_bytes"; public static final String BLKIO_TIME = "/blkio.time"; public static final String BLKIO_SECTORS = "/blkio.sectors"; public static final String BLKIO_IO_SERVICED = "/blkio.io_serviced"; - public static final String BLKIO_IO_SERVICE_BYTES = - "/blkio.io_service_bytes"; + public static final String BLKIO_IO_SERVICE_BYTES = "/blkio.io_service_bytes"; public static final String BLKIO_IO_SERVICE_TIME = "/blkio.io_service_time"; public static final String BLKIO_IO_WAIT_TIME = "/blkio.io_wait_time"; public static final String BLKIO_IO_MERGED = "/blkio.io_merged"; @@ -71,28 +64,19 @@ public class BlkioCore implements CgroupCore { /* weight: 100-1000 */ public void setBlkioWeight(int weight) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT), - String.valueOf(weight)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT), String.valueOf(weight)); } public int getBlkioWeight() throws IOException { - return Integer.valueOf( - CgroupUtils.readFileByLine( - Constants.getDir(this.dir, BLKIO_WEIGHT)).get(0)) - .intValue(); + return Integer.valueOf(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT)).get(0)).intValue(); } - public void setBlkioWeightDevice(Device device, int weight) - throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, BLKIO_WEIGHT_DEVICE), - makeContext(device, weight)); + public void setBlkioWeightDevice(Device device, int weight) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT_DEVICE), makeContext(device, weight)); } public Map<Device, Integer> getBlkioWeightDevice() throws IOException { - List<String> strings = - CgroupUtils.readFileByLine(Constants.getDir(this.dir, - BLKIO_WEIGHT_DEVICE)); + List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT_DEVICE)); Map<Device, Integer> result = new HashMap<Device, Integer>(); for (String string : strings) { String[] strArgs = string.split(" "); @@ -104,15 +88,11 @@ public class BlkioCore implements CgroupCore { } public void setReadBps(Device device, long bps) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, BLKIO_THROTTLE_READ_BPS_DEVICE), - makeContext(device, bps)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_BPS_DEVICE), makeContext(device, bps)); } public Map<Device, Long> getReadBps() throws IOException { - List<String> strings = - CgroupUtils.readFileByLine(Constants.getDir(this.dir, - BLKIO_THROTTLE_READ_BPS_DEVICE)); + List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_BPS_DEVICE)); Map<Device, Long> result = new HashMap<Device, Long>(); for (String string : strings) { String[] strArgs = string.split(" "); @@ -124,15 +104,11 @@ public class BlkioCore implements CgroupCore { } public void setWriteBps(Device device, long bps) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_BPS_DEVICE), - makeContext(device, bps)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_BPS_DEVICE), makeContext(device, bps)); } public Map<Device, Long> getWriteBps() throws IOException { - List<String> strings = - CgroupUtils.readFileByLine(Constants.getDir(this.dir, - BLKIO_THROTTLE_WRITE_BPS_DEVICE)); + List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_BPS_DEVICE)); Map<Device, Long> result = new HashMap<Device, Long>(); for (String string : strings) { String[] strArgs = string.split(" "); @@ -144,15 +120,11 @@ public class BlkioCore implements CgroupCore { } public void setReadIOps(Device device, long iops) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, BLKIO_THROTTLE_READ_IOPS_DEVICE), - makeContext(device, iops)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_IOPS_DEVICE), makeContext(device, iops)); } public Map<Device, Long> getReadIOps() throws IOException { - List<String> strings = - CgroupUtils.readFileByLine(Constants.getDir(this.dir, - BLKIO_THROTTLE_READ_IOPS_DEVICE)); + List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_IOPS_DEVICE)); Map<Device, Long> result = new HashMap<Device, Long>(); for (String string : strings) { String[] strArgs = string.split(" "); @@ -164,15 +136,11 @@ public class BlkioCore implements CgroupCore { } public void setWriteIOps(Device device, long iops) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_IOPS_DEVICE), - makeContext(device, iops)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_IOPS_DEVICE), makeContext(device, iops)); } public Map<Device, Long> getWriteIOps() throws IOException { - List<String> strings = - CgroupUtils.readFileByLine(Constants.getDir(this.dir, - BLKIO_THROTTLE_WRITE_IOPS_DEVICE)); + List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_IOPS_DEVICE)); Map<Device, Long> result = new HashMap<Device, Long>(); for (String string : strings) { String[] strArgs = string.split(" "); @@ -183,23 +151,17 @@ public class BlkioCore implements CgroupCore { return result; } - public Map<Device, Map<RecordType, Long>> getThrottleIOServiced() - throws IOException { - return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir( - this.dir, BLKIO_THROTTLE_IO_SERVICED))); + public Map<Device, Map<RecordType, Long>> getThrottleIOServiced() throws IOException { + return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_IO_SERVICED))); } - public Map<Device, Map<RecordType, Long>> getThrottleIOServiceByte() - throws IOException { - return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir( - this.dir, BLKIO_THROTTLE_IO_SERVICE_BYTES))); + public Map<Device, Map<RecordType, Long>> getThrottleIOServiceByte() throws IOException { + return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_IO_SERVICE_BYTES))); } public Map<Device, Long> getBlkioTime() throws IOException { Map<Device, Long> result = new HashMap<Device, Long>(); - List<String> strs = - CgroupUtils.readFileByLine(Constants.getDir(this.dir, - BLKIO_TIME)); + List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_TIME)); for (String str : strs) { String[] strArgs = str.split(" "); result.put(new Device(strArgs[0]), Long.parseLong(strArgs[1])); @@ -209,9 +171,7 @@ public class BlkioCore implements CgroupCore { public Map<Device, Long> getBlkioSectors() throws IOException { Map<Device, Long> result = new HashMap<Device, Long>(); - List<String> strs = - CgroupUtils.readFileByLine(Constants.getDir(this.dir, - BLKIO_SECTORS)); + List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_SECTORS)); for (String str : strs) { String[] strArgs = str.split(" "); result.put(new Device(strArgs[0]), Long.parseLong(strArgs[1])); @@ -219,43 +179,32 @@ public class BlkioCore implements CgroupCore { return result; } - public Map<Device, Map<RecordType, Long>> getIOServiced() - throws IOException { - return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir( - this.dir, BLKIO_IO_SERVICED))); + public Map<Device, Map<RecordType, Long>> getIOServiced() throws IOException { + return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_SERVICED))); } - public Map<Device, Map<RecordType, Long>> getIOServiceBytes() - throws IOException { - return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir( - this.dir, BLKIO_IO_SERVICE_BYTES))); + public Map<Device, Map<RecordType, Long>> getIOServiceBytes() throws IOException { + return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_SERVICE_BYTES))); } - public Map<Device, Map<RecordType, Long>> getIOServiceTime() - throws IOException { - return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir( - this.dir, BLKIO_IO_SERVICE_TIME))); + public Map<Device, Map<RecordType, Long>> getIOServiceTime() throws IOException { + return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_SERVICE_TIME))); } - public Map<Device, Map<RecordType, Long>> getIOWaitTime() - throws IOException { - return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir( - this.dir, BLKIO_IO_WAIT_TIME))); + public Map<Device, Map<RecordType, Long>> getIOWaitTime() throws IOException { + return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_WAIT_TIME))); } public Map<Device, Map<RecordType, Long>> getIOMerged() throws IOException { - return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir( - this.dir, BLKIO_IO_MERGED))); + return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_MERGED))); } public Map<Device, Map<RecordType, Long>> getIOQueued() throws IOException { - return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir( - this.dir, BLKIO_IO_QUEUED))); + return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_QUEUED))); } public void resetStats() throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, BLKIO_RESET_STATS), "1"); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_RESET_STATS), "1"); } private String makeContext(Device device, Object data) { @@ -265,8 +214,7 @@ public class BlkioCore implements CgroupCore { } private Map<Device, Map<RecordType, Long>> analyseRecord(List<String> strs) { - Map<Device, Map<RecordType, Long>> result = - new HashMap<Device, Map<RecordType, Long>>(); + Map<Device, Map<RecordType, Long>> result = new HashMap<Device, Map<RecordType, Long>>(); for (String str : strs) { String[] strArgs = str.split(" "); if (strArgs.length != 3) http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuCore.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuCore.java index 609898e..6a723a0 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuCore.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuCore.java @@ -46,62 +46,47 @@ public class CpuCore implements CgroupCore { } public void setCpuShares(int weight) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_SHARES), - String.valueOf(weight)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_SHARES), String.valueOf(weight)); } public int getCpuShares() throws IOException { - return Integer.parseInt(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, CPU_SHARES)).get(0)); + return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_SHARES)).get(0)); } public void setCpuRtRuntimeUs(long us) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, CPU_RT_RUNTIME_US), - String.valueOf(us)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_RT_RUNTIME_US), String.valueOf(us)); } public long getCpuRtRuntimeUs() throws IOException { - return Long.parseLong(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, CPU_RT_RUNTIME_US)).get(0)); + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_RT_RUNTIME_US)).get(0)); } public void setCpuRtPeriodUs(long us) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, CPU_RT_PERIOD_US), - String.valueOf(us)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_RT_PERIOD_US), String.valueOf(us)); } public Long getCpuRtPeriodUs() throws IOException { - return Long.parseLong(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, CPU_RT_PERIOD_US)).get(0)); + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_RT_PERIOD_US)).get(0)); } public void setCpuCfsPeriodUs(long us) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, CPU_CFS_PERIOD_US), - String.valueOf(us)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_PERIOD_US), String.valueOf(us)); } public Long getCpuCfsPeriodUs(long us) throws IOException { - return Long.parseLong(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, CPU_CFS_PERIOD_US)).get(0)); + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_CFS_PERIOD_US)).get(0)); } public void setCpuCfsQuotaUs(long us) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, CPU_CFS_QUOTA_US), - String.valueOf(us)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_QUOTA_US), String.valueOf(us)); } public Long getCpuCfsQuotaUs(long us) throws IOException { - return Long.parseLong(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, CPU_CFS_QUOTA_US)).get(0)); + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_CFS_QUOTA_US)).get(0)); } public Stat getCpuStat() throws IOException { - return new Stat(CgroupUtils.readFileByLine(Constants.getDir(this.dir, - CPU_STAT))); + return new Stat(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_STAT))); } public static class Stat { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuacctCore.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuacctCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuacctCore.java index c54421b..8bec196 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuacctCore.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuacctCore.java @@ -45,14 +45,11 @@ public class CpuacctCore implements CgroupCore { } public Long getCpuUsage() throws IOException { - return Long.parseLong(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, CPUACCT_USAGE)).get(0)); + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUACCT_USAGE)).get(0)); } public Map<StatType, Long> getCpuStat() throws IOException { - List<String> strs = - CgroupUtils.readFileByLine(Constants.getDir(this.dir, - CPUACCT_STAT)); + List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUACCT_STAT)); Map<StatType, Long> result = new HashMap<StatType, Long>(); result.put(StatType.user, Long.parseLong(strs.get(0).split(" ")[1])); result.put(StatType.system, Long.parseLong(strs.get(1).split(" ")[1])); @@ -60,10 +57,7 @@ public class CpuacctCore implements CgroupCore { } public Long[] getPerCpuUsage() throws IOException { - String str = - CgroupUtils.readFileByLine( - Constants.getDir(this.dir, CPUACCT_USAGE_PERCPU)) - .get(0); + String str = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUACCT_USAGE_PERCPU)).get(0); String[] strArgs = str.split(" "); Long[] result = new Long[strArgs.length]; for (int i = 0; i < result.length; i++) { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpusetCore.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpusetCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpusetCore.java index d693b6c..02bcace 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpusetCore.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpusetCore.java @@ -32,18 +32,12 @@ public class CpusetCore implements CgroupCore { public static final String CPUSET_CPU_EXCLUSIVE = "/cpuset.cpu_exclusive"; public static final String CPUSET_MEM_EXCLUSIVE = "/cpuset.mem_exclusive"; public static final String CPUSET_MEM_HARDWALL = "/cpuset.mem_hardwall"; - public static final String CPUSET_MEMORY_PRESSURE = - "/cpuset.memory_pressure"; - public static final String CPUSET_MEMORY_PRESSURE_ENABLED = - "/cpuset.memory_pressure_enabled"; - public static final String CPUSET_MEMORY_SPREAD_PAGE = - "/cpuset.memory_spread_page"; - public static final String CPUSET_MEMORY_SPREAD_SLAB = - "/cpuset.memory_spread_slab"; - public static final String CPUSET_SCHED_LOAD_BALANCE = - "/cpuset.sched_load_balance"; - public static final String CPUSET_SCHED_RELAX_DOMAIN_LEVEL = - "/cpuset.sched_relax_domain_level"; + public static final String CPUSET_MEMORY_PRESSURE = "/cpuset.memory_pressure"; + public static final String CPUSET_MEMORY_PRESSURE_ENABLED = "/cpuset.memory_pressure_enabled"; + public static final String CPUSET_MEMORY_SPREAD_PAGE = "/cpuset.memory_spread_page"; + public static final String CPUSET_MEMORY_SPREAD_SLAB = "/cpuset.memory_spread_slab"; + public static final String CPUSET_SCHED_LOAD_BALANCE = "/cpuset.sched_load_balance"; + public static final String CPUSET_SCHED_RELAX_DOMAIN_LEVEL = "/cpuset.sched_relax_domain_level"; private final String dir; @@ -64,14 +58,11 @@ public class CpusetCore implements CgroupCore { sb.append(','); } sb.deleteCharAt(sb.length() - 1); - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_CPUS), - sb.toString()); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_CPUS), sb.toString()); } public int[] getCpus() throws IOException { - String output = - CgroupUtils.readFileByLine( - Constants.getDir(this.dir, CPUSET_CPUS)).get(0); + String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_CPUS)).get(0); return parseNums(output); } @@ -82,147 +73,97 @@ public class CpusetCore implements CgroupCore { sb.append(','); } sb.deleteCharAt(sb.length() - 1); - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMS), - sb.toString()); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMS), sb.toString()); } public int[] getMems() throws IOException { - String output = - CgroupUtils.readFileByLine( - Constants.getDir(this.dir, CPUSET_MEMS)).get(0); + String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMS)).get(0); return parseNums(output); } public void setMemMigrate(boolean flag) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, CPUSET_MEMORY_MIGRATE), - String.valueOf(flag ? 1 : 0)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_MIGRATE), String.valueOf(flag ? 1 : 0)); } public boolean isMemMigrate() throws IOException { - int output = - Integer.parseInt(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, CPUSET_MEMORY_MIGRATE)).get( - 0)); + int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_MIGRATE)).get(0)); return output > 0; } public void setCpuExclusive(boolean flag) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, CPUSET_CPU_EXCLUSIVE), - String.valueOf(flag ? 1 : 0)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_CPU_EXCLUSIVE), String.valueOf(flag ? 1 : 0)); } public boolean isCpuExclusive() throws IOException { - int output = - Integer.parseInt(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, CPUSET_CPU_EXCLUSIVE)) - .get(0)); + int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_CPU_EXCLUSIVE)).get(0)); return output > 0; } public void setMemExclusive(boolean flag) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, CPUSET_MEM_EXCLUSIVE), - String.valueOf(flag ? 1 : 0)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEM_EXCLUSIVE), String.valueOf(flag ? 1 : 0)); } public boolean isMemExclusive() throws IOException { - int output = - Integer.parseInt(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, CPUSET_MEM_EXCLUSIVE)) - .get(0)); + int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEM_EXCLUSIVE)).get(0)); return output > 0; } public void setMemHardwall(boolean flag) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, CPUSET_MEM_HARDWALL), - String.valueOf(flag ? 1 : 0)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEM_HARDWALL), String.valueOf(flag ? 1 : 0)); } public boolean isMemHardwall() throws IOException { - int output = - Integer.parseInt(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, CPUSET_MEM_HARDWALL)).get(0)); + int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEM_HARDWALL)).get(0)); return output > 0; } public int getMemPressure() throws IOException { - String output = - CgroupUtils.readFileByLine( - Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE)) - .get(0); + String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE)).get(0); return Integer.parseInt(output); } public void setMemPressureEnabled(boolean flag) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE_ENABLED), - String.valueOf(flag ? 1 : 0)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE_ENABLED), String.valueOf(flag ? 1 : 0)); } public boolean isMemPressureEnabled() throws IOException { - int output = - Integer.parseInt(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, - CPUSET_MEMORY_PRESSURE_ENABLED)).get(0)); + int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE_ENABLED)).get(0)); return output > 0; } public void setMemSpreadPage(boolean flag) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE), - String.valueOf(flag ? 1 : 0)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE), String.valueOf(flag ? 1 : 0)); } public boolean isMemSpreadPage() throws IOException { - int output = - Integer.parseInt(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE)) - .get(0)); + int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE)).get(0)); return output > 0; } public void setMemSpreadSlab(boolean flag) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB), - String.valueOf(flag ? 1 : 0)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB), String.valueOf(flag ? 1 : 0)); } public boolean isMemSpreadSlab() throws IOException { - int output = - Integer.parseInt(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB)) - .get(0)); + int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB)).get(0)); return output > 0; } public void setSchedLoadBlance(boolean flag) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE), - String.valueOf(flag ? 1 : 0)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE), String.valueOf(flag ? 1 : 0)); } public boolean isSchedLoadBlance() throws IOException { - int output = - Integer.parseInt(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE)) - .get(0)); + int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE)).get(0)); return output > 0; } public void setSchedRelaxDomainLevel(int value) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, CPUSET_SCHED_RELAX_DOMAIN_LEVEL), - String.valueOf(value)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_RELAX_DOMAIN_LEVEL), String.valueOf(value)); } public int getSchedRelaxDomainLevel() throws IOException { - String output = - CgroupUtils.readFileByLine( - Constants.getDir(this.dir, - CPUSET_SCHED_RELAX_DOMAIN_LEVEL)).get(0); + String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_RELAX_DOMAIN_LEVEL)).get(0); return Integer.parseInt(output); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/DevicesCore.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/DevicesCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/DevicesCore.java index 1832668..491fc8f 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/DevicesCore.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/DevicesCore.java @@ -110,8 +110,7 @@ public class DevicesCore implements CgroupCore { final int prime = 31; int result = 1; result = prime * result + accesses; - result = - prime * result + ((device == null) ? 0 : device.hashCode()); + result = prime * result + ((device == null) ? 0 : device.hashCode()); result = prime * result + type; return result; } @@ -161,27 +160,21 @@ public class DevicesCore implements CgroupCore { } } - private void setPermission(String prop, char type, Device device, - int accesses) throws IOException { + private void setPermission(String prop, char type, Device device, int accesses) throws IOException { Record record = new Record(type, device, accesses); - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, prop), - record.toString()); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, prop), record.toString()); } - public void setAllow(char type, Device device, int accesses) - throws IOException { + public void setAllow(char type, Device device, int accesses) throws IOException { setPermission(DEVICES_ALLOW, type, device, accesses); } - public void setDeny(char type, Device device, int accesses) - throws IOException { + public void setDeny(char type, Device device, int accesses) throws IOException { setPermission(DEVICES_DENY, type, device, accesses); } public Record[] getList() throws IOException { - List<String> output = - CgroupUtils.readFileByLine(Constants.getDir(this.dir, - DEVICES_LIST)); + List<String> output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, DEVICES_LIST)); return Record.parseRecordList(output); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/FreezerCore.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/FreezerCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/FreezerCore.java index c601c3e..e0ad3da 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/FreezerCore.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/FreezerCore.java @@ -40,13 +40,11 @@ public class FreezerCore implements CgroupCore { } public void setState(State state) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, FREEZER_STATE), - state.name().toUpperCase()); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, FREEZER_STATE), state.name().toUpperCase()); } public State getState() throws IOException { - return State.getStateValue(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, FREEZER_STATE)).get(0)); + return State.getStateValue(CgroupUtils.readFileByLine(Constants.getDir(this.dir, FREEZER_STATE)).get(0)); } public enum State { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/MemoryCore.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/MemoryCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/MemoryCore.java index a2db78c..1b37bd3 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/MemoryCore.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/MemoryCore.java @@ -27,15 +27,11 @@ public class MemoryCore implements CgroupCore { public static final String MEMORY_STAT = "/memory.stat"; public static final String MEMORY_USAGE_IN_BYTES = "/memory.usage_in_bytes"; - public static final String MEMORY_MEMSW_USAGE_IN_BYTES = - "/memory.memsw.usage_in_bytes"; - public static final String MEMORY_MAX_USAGE_IN_BYTES = - "/memory.max_usage_in_bytes"; - public static final String MEMORY_MEMSW_MAX_USAGE_IN_BYTES = - "/memory.memsw.max_usage_in_bytes"; + public static final String MEMORY_MEMSW_USAGE_IN_BYTES = "/memory.memsw.usage_in_bytes"; + public static final String MEMORY_MAX_USAGE_IN_BYTES = "/memory.max_usage_in_bytes"; + public static final String MEMORY_MEMSW_MAX_USAGE_IN_BYTES = "/memory.memsw.max_usage_in_bytes"; public static final String MEMORY_LIMIT_IN_BYTES = "/memory.limit_in_bytes"; - public static final String MEMORY_MEMSW_LIMIT_IN_BYTES = - "/memory.memsw.limit_in_bytes"; + public static final String MEMORY_MEMSW_LIMIT_IN_BYTES = "/memory.memsw.limit_in_bytes"; public static final String MEMORY_FAILCNT = "/memory.failcnt"; public static final String MEMORY_MEMSW_FAILCNT = "/memory.memsw.failcnt"; public static final String MEMORY_FORCE_EMPTY = "/memory.force_empty"; @@ -115,113 +111,78 @@ public class MemoryCore implements CgroupCore { } public Stat getStat() throws IOException { - String output = - CgroupUtils.readFileByLine( - Constants.getDir(this.dir, MEMORY_STAT)).get(0); + String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_STAT)).get(0); Stat stat = new Stat(output); return stat; } public long getPhysicalUsage() throws IOException { - return Long.parseLong(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, MEMORY_USAGE_IN_BYTES)).get(0)); + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_USAGE_IN_BYTES)).get(0)); } public long getWithSwapUsage() throws IOException { - return Long - .parseLong(CgroupUtils - .readFileByLine( - Constants.getDir(this.dir, - MEMORY_MEMSW_USAGE_IN_BYTES)).get(0)); + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_USAGE_IN_BYTES)).get(0)); } public long getMaxPhysicalUsage() throws IOException { - return Long.parseLong(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, MEMORY_MAX_USAGE_IN_BYTES)).get(0)); + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MAX_USAGE_IN_BYTES)).get(0)); } public long getMaxWithSwapUsage() throws IOException { - return Long.parseLong(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, MEMORY_MEMSW_MAX_USAGE_IN_BYTES)) - .get(0)); + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_MAX_USAGE_IN_BYTES)).get(0)); } public void setPhysicalUsageLimit(long value) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, MEMORY_LIMIT_IN_BYTES), - String.valueOf(value)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_LIMIT_IN_BYTES), String.valueOf(value)); } public long getPhysicalUsageLimit() throws IOException { - return Long.parseLong(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, MEMORY_LIMIT_IN_BYTES)).get(0)); + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_LIMIT_IN_BYTES)).get(0)); } public void setWithSwapUsageLimit(long value) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, MEMORY_MEMSW_LIMIT_IN_BYTES), - String.valueOf(value)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_LIMIT_IN_BYTES), String.valueOf(value)); } public long getWithSwapUsageLimit() throws IOException { - return Long - .parseLong(CgroupUtils - .readFileByLine( - Constants.getDir(this.dir, - MEMORY_MEMSW_LIMIT_IN_BYTES)).get(0)); + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_LIMIT_IN_BYTES)).get(0)); } public int getPhysicalFailCount() throws IOException { - return Integer.parseInt(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, MEMORY_FAILCNT)).get(0)); + return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_FAILCNT)).get(0)); } public int getWithSwapFailCount() throws IOException { - return Integer.parseInt(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, MEMORY_MEMSW_FAILCNT)).get(0)); + return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_FAILCNT)).get(0)); } public void clearForceEmpty() throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, MEMORY_FORCE_EMPTY), - String.valueOf(0)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_FORCE_EMPTY), String.valueOf(0)); } public void setSwappiness(int value) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, MEMORY_SWAPPINESS), - String.valueOf(value)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_SWAPPINESS), String.valueOf(value)); } public int getSwappiness() throws IOException { - return Integer.parseInt(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, MEMORY_SWAPPINESS)).get(0)); + return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_SWAPPINESS)).get(0)); } public void setUseHierarchy(boolean flag) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, MEMORY_USE_HIERARCHY), - String.valueOf(flag ? 1 : 0)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_USE_HIERARCHY), String.valueOf(flag ? 1 : 0)); } public boolean isUseHierarchy() throws IOException { - int output = - Integer.parseInt(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, MEMORY_USE_HIERARCHY)) - .get(0)); + int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_USE_HIERARCHY)).get(0)); return output > 0; } public void setOomControl(boolean flag) throws IOException { - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, MEMORY_OOM_CONTROL), - String.valueOf(flag ? 1 : 0)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_OOM_CONTROL), String.valueOf(flag ? 1 : 0)); } public boolean isOomControl() throws IOException { - String output = - CgroupUtils.readFileByLine( - Constants.getDir(this.dir, MEMORY_OOM_CONTROL)).get(0); + String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_OOM_CONTROL)).get(0); output = output.split("\n")[0].split("[\\s]")[1]; int value = Integer.parseInt(output); return value > 0; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetClsCore.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetClsCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetClsCore.java index dd80c0a..e7c376d 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetClsCore.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetClsCore.java @@ -58,14 +58,11 @@ public class NetClsCore implements CgroupCore { StringBuilder sb = new StringBuilder("0x"); sb.append(toHex(major)); sb.append(toHex(minor)); - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, NET_CLS_CLASSID), sb.toString()); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NET_CLS_CLASSID), sb.toString()); } public Device getClassId() throws IOException { - String output = - CgroupUtils.readFileByLine( - Constants.getDir(this.dir, NET_CLS_CLASSID)).get(0); + String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, NET_CLS_CLASSID)).get(0); output = Integer.toHexString(Integer.parseInt(output)); int major = Integer.parseInt(output.substring(0, output.length() - 4)); int minor = Integer.parseInt(output.substring(output.length() - 4)); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetPrioCore.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetPrioCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetPrioCore.java index fd7e899..6b9b344 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetPrioCore.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetPrioCore.java @@ -44,8 +44,7 @@ public class NetPrioCore implements CgroupCore { } public int getPrioId() throws IOException { - return Integer.parseInt(CgroupUtils.readFileByLine( - Constants.getDir(this.dir, NET_PRIO_PRIOIDX)).get(0)); + return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, NET_PRIO_PRIOIDX)).get(0)); } public void setIfPrioMap(String iface, int priority) throws IOException { @@ -53,15 +52,12 @@ public class NetPrioCore implements CgroupCore { sb.append(iface); sb.append(' '); sb.append(priority); - CgroupUtils.writeFileByLine( - Constants.getDir(this.dir, NET_PRIO_IFPRIOMAP), sb.toString()); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NET_PRIO_IFPRIOMAP), sb.toString()); } public Map<String, Integer> getIfPrioMap() throws IOException { Map<String, Integer> result = new HashMap<String, Integer>(); - List<String> strs = - CgroupUtils.readFileByLine(Constants.getDir(this.dir, - NET_PRIO_IFPRIOMAP)); + List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, NET_PRIO_IFPRIOMAP)); for (String str : strs) { String[] strArgs = str.split(" "); result.put(strArgs[0], Integer.valueOf(strArgs[1])); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/DefaultInimbus.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/DefaultInimbus.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/DefaultInimbus.java index 6c2bd21..f788996 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/DefaultInimbus.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/DefaultInimbus.java @@ -37,9 +37,8 @@ public class DefaultInimbus implements INimbus { } @Override - public Collection<WorkerSlot> allSlotsAvailableForScheduling( - Collection<SupervisorDetails> existingSupervisors, - Topologies topologies, Set<String> topologiesMissingAssignments) { + public Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> existingSupervisors, Topologies topologies, + Set<String> topologiesMissingAssignments) { // TODO Auto-generated method stub Collection<WorkerSlot> result = new HashSet<WorkerSlot>(); for (SupervisorDetails detail : existingSupervisors) { @@ -50,15 +49,13 @@ public class DefaultInimbus implements INimbus { } @Override - public void assignSlots(Topologies topologies, - Map<String, Collection<WorkerSlot>> newSlotsByTopologyId) { + public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId) { // TODO Auto-generated method stub } @Override - public String getHostName( - Map<String, SupervisorDetails> existingSupervisors, String nodeId) { + public String getHostName(Map<String, SupervisorDetails> existingSupervisors, String nodeId) { // TODO Auto-generated method stub return null; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusCache.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusCache.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusCache.java index 3858595..97dd079 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusCache.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusCache.java @@ -17,104 +17,87 @@ */ package com.alibaba.jstorm.daemon.nimbus; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; - -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.utils.Utils; - import com.alibaba.jstorm.cache.JStormCache; import com.alibaba.jstorm.cache.RocksDBCache; import com.alibaba.jstorm.cache.TimeoutMemCache; -import com.alibaba.jstorm.callback.RunnableCallback; import com.alibaba.jstorm.client.ConfigExtension; -import com.alibaba.jstorm.cluster.Cluster; -import com.alibaba.jstorm.cluster.Common; -import com.alibaba.jstorm.cluster.StormBase; import com.alibaba.jstorm.cluster.StormClusterState; import com.alibaba.jstorm.cluster.StormConfig; -import com.alibaba.jstorm.task.TaskInfo; -import com.alibaba.jstorm.task.error.TaskError; import com.alibaba.jstorm.utils.OSInfo; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; -public class NimbusCache{ - private static final long serialVersionUID = 1685576554130463610L; - +public class NimbusCache { private static final Logger LOG = LoggerFactory.getLogger(NimbusCache.class); - - + public static final String TIMEOUT_MEM_CACHE_CLASS = TimeoutMemCache.class.getName(); public static final String ROCKS_DB_CACHE_CLASS = RocksDBCache.class.getName(); - + protected JStormCache memCache; protected JStormCache dbCache; protected StormClusterState zkCluster; - + public String getNimbusCacheClass(Map conf) { boolean isLinux = OSInfo.isLinux(); boolean isMac = OSInfo.isMac(); boolean isLocal = StormConfig.local_mode(conf); - + if (isLocal == true) { return TIMEOUT_MEM_CACHE_CLASS; } - + if (isLinux == false && isMac == false) { return TIMEOUT_MEM_CACHE_CLASS; } - + String nimbusCacheClass = ConfigExtension.getNimbusCacheClass(conf); if (StringUtils.isBlank(nimbusCacheClass) == false) { return nimbusCacheClass; } - + return ROCKS_DB_CACHE_CLASS; - + } - + public NimbusCache(Map conf, StormClusterState zkCluster) { super(); - + String dbCacheClass = getNimbusCacheClass(conf); LOG.info("NimbusCache db Cache will use {}", dbCacheClass); - + try { - dbCache = (JStormCache)Utils.newInstance(dbCacheClass); - + dbCache = (JStormCache) Utils.newInstance(dbCacheClass); + String dbDir = StormConfig.masterDbDir(conf); conf.put(RocksDBCache.ROCKSDB_ROOT_DIR, dbDir); - + conf.put(RocksDBCache.ROCKSDB_RESET, ConfigExtension.getNimbusCacheReset(conf)); - + dbCache.init(conf); - + if (dbCache instanceof TimeoutMemCache) { memCache = dbCache; - }else { + } else { memCache = new TimeoutMemCache(); memCache.init(conf); } - }catch(java.lang.UnsupportedClassVersionError e) { - - if (e.getMessage().indexOf("Unsupported major.minor version") >= 0) { - LOG.error("!!!Please update jdk version to 7 or higher!!!"); - - } - LOG.error("Failed to create NimbusCache!", e); + } catch (UnsupportedClassVersionError e) { + + if (e.getMessage().indexOf("Unsupported major.minor version") >= 0) { + LOG.error("!!!Please update jdk version to 7 or higher!!!"); + + } + LOG.error("Failed to create NimbusCache!", e); throw new RuntimeException(e); } catch (Exception e) { LOG.error("Failed to create NimbusCache!", e); throw new RuntimeException(e); } - + this.zkCluster = zkCluster; } @@ -128,19 +111,15 @@ public class NimbusCache{ public void cleanup() { dbCache.cleanup(); - + } - - + /** * - * In the old design, - * DBCache will cache all taskInfo/taskErrors, this will be useful for huge topology + * In the old design, DBCache will cache all taskInfo/taskErrors, this will be useful for huge topology * - * But the latest zk design, taskInfo is only one znode, taskErros has few znode - * So remove them from DBCache - * Skip timely refresh taskInfo/taskErrors + * But the latest zk design, taskInfo is only one znode, taskErros has few znode So remove them from DBCache Skip timely refresh taskInfo/taskErrors * */ - + }
