http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/Time.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/Time.java b/jstorm-core/src/main/java/backtype/storm/utils/Time.java index 50a79fd..8732008 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/Time.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/Time.java @@ -24,86 +24,87 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class Time { - public static Logger LOG = LoggerFactory.getLogger(Time.class); - + public static Logger LOG = LoggerFactory.getLogger(Time.class); + private static AtomicBoolean simulating = new AtomicBoolean(false); - //TODO: should probably use weak references here or something + // TODO: should probably use weak references here or something private static volatile Map<Thread, AtomicLong> threadSleepTimes; private static final Object sleepTimesLock = new Object(); - - private static AtomicLong simulatedCurrTimeMs; //should this be a thread local that's allowed to keep advancing? - + + private static AtomicLong simulatedCurrTimeMs; // should this be a thread local that's allowed to keep advancing? + public static void startSimulating() { - synchronized(sleepTimesLock) { + synchronized (sleepTimesLock) { simulating.set(true); simulatedCurrTimeMs = new AtomicLong(0); threadSleepTimes = new ConcurrentHashMap<Thread, AtomicLong>(); } } - + public static void stopSimulating() { - synchronized(sleepTimesLock) { - simulating.set(false); - threadSleepTimes = null; + synchronized (sleepTimesLock) { + simulating.set(false); + threadSleepTimes = null; } } - + public static boolean isSimulating() { return simulating.get(); } - + public static void sleepUntil(long targetTimeMs) throws InterruptedException { - if(simulating.get()) { + if (simulating.get()) { try { - synchronized(sleepTimesLock) { + synchronized (sleepTimesLock) { threadSleepTimes.put(Thread.currentThread(), new AtomicLong(targetTimeMs)); } - while(simulatedCurrTimeMs.get() < targetTimeMs) { + while (simulatedCurrTimeMs.get() < targetTimeMs) { Thread.sleep(10); } } finally { - synchronized(sleepTimesLock) { + synchronized (sleepTimesLock) { if (simulating.get()) { threadSleepTimes.remove(Thread.currentThread()); } } } } else { - long sleepTime = targetTimeMs-currentTimeMillis(); - if(sleepTime>0) + long sleepTime = targetTimeMs - currentTimeMillis(); + if (sleepTime > 0) Thread.sleep(sleepTime); } } - + public static void sleep(long ms) throws InterruptedException { - sleepUntil(currentTimeMillis()+ms); + sleepUntil(currentTimeMillis() + ms); } - + public static long currentTimeMillis() { - if(simulating.get()) { + if (simulating.get()) { return simulatedCurrTimeMs.get(); } else { return System.currentTimeMillis(); } } - + public static int currentTimeSecs() { return (int) (currentTimeMillis() / 1000); } - + public static void advanceTime(long ms) { - if(!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode"); + if (!simulating.get()) + throw new IllegalStateException("Cannot simulate time unless in simulation mode"); simulatedCurrTimeMs.set(simulatedCurrTimeMs.get() + ms); } - + public static boolean isThreadWaiting(Thread t) { - if(!simulating.get()) throw new IllegalStateException("Must be in simulation mode"); + if (!simulating.get()) + throw new IllegalStateException("Must be in simulation mode"); AtomicLong time; - synchronized(sleepTimesLock) { + synchronized (sleepTimesLock) { time = threadSleepTimes.get(t); } - return !t.isAlive() || time!=null && currentTimeMillis() < time.longValue(); - } + return !t.isAlive() || time != null && currentTimeMillis() < time.longValue(); + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/TimeCacheMap.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/TimeCacheMap.java b/jstorm-core/src/main/java/backtype/storm/utils/TimeCacheMap.java index f0a194f..a29a954 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/TimeCacheMap.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/TimeCacheMap.java @@ -37,18 +37,18 @@ import java.util.Set; public class TimeCacheMap<K, V> { // this default ensures things expire at most 50% past the expiration time private static final int DEFAULT_NUM_BUCKETS = 3; - + @Deprecated public static interface ExpiredCallback<K, V> { public void expire(K key, V val); } - + private LinkedList<HashMap<K, V>> _buckets; - + private final Object _lock = new Object(); private Thread _cleaner; private ExpiredCallback _callback; - + public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) { if (numBuckets < 2) { throw new IllegalArgumentException("numBuckets must be >= 2"); @@ -57,7 +57,7 @@ public class TimeCacheMap<K, V> { for (int i = 0; i < numBuckets; i++) { _buckets.add(new HashMap<K, V>()); } - + _callback = callback; final long expirationMillis = expirationSecs * 1000L; final long sleepTime = expirationMillis / (numBuckets - 1); @@ -78,26 +78,26 @@ public class TimeCacheMap<K, V> { } } } catch (InterruptedException ex) { - + } } }); _cleaner.setDaemon(true); _cleaner.start(); } - + public TimeCacheMap(int expirationSecs, ExpiredCallback<K, V> callback) { this(expirationSecs, DEFAULT_NUM_BUCKETS, callback); } - + public TimeCacheMap(int expirationSecs) { this(expirationSecs, DEFAULT_NUM_BUCKETS); } - + public TimeCacheMap(int expirationSecs, int numBuckets) { this(expirationSecs, numBuckets, null); } - + public boolean containsKey(K key) { synchronized (_lock) { for (HashMap<K, V> bucket : _buckets) { @@ -108,7 +108,7 @@ public class TimeCacheMap<K, V> { return false; } } - + public V get(K key) { synchronized (_lock) { for (HashMap<K, V> bucket : _buckets) { @@ -119,7 +119,7 @@ public class TimeCacheMap<K, V> { return null; } } - + public void put(K key, V value) { synchronized (_lock) { Iterator<HashMap<K, V>> it = _buckets.iterator(); @@ -131,7 +131,7 @@ public class TimeCacheMap<K, V> { } } } - + public Object remove(K key) { synchronized (_lock) { for (HashMap<K, V> bucket : _buckets) { @@ -142,7 +142,7 @@ public class TimeCacheMap<K, V> { return null; } } - + public int size() { synchronized (_lock) { int size = 0; @@ -152,11 +152,11 @@ public class TimeCacheMap<K, V> { return size; } } - + public void cleanup() { _cleaner.interrupt(); } - + public Set<K> keySet() { Set<K> ret = new HashSet<K>(); synchronized (_lock) { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/TransferDrainer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/TransferDrainer.java b/jstorm-core/src/main/java/backtype/storm/utils/TransferDrainer.java index 4638117..48b39b7 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/TransferDrainer.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/TransferDrainer.java @@ -26,88 +26,88 @@ import backtype.storm.messaging.TaskMessage; public class TransferDrainer { - private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap(); - - public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) { - for (String key : workerTupleSetMap.keySet()) { - - ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key); - if (null == bundle) { - bundle = new ArrayList<ArrayList<TaskMessage>>(); - bundles.put(key, bundle); - } - - ArrayList tupleSet = workerTupleSetMap.get(key); - if (null != tupleSet && tupleSet.size() > 0) { - bundle.add(tupleSet); - } - } - } - - public void send(HashMap<String, IConnection> connections) { - for (String hostPort : bundles.keySet()) { - IConnection connection = connections.get(hostPort); - if (null != connection) { - ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort); - for (ArrayList<TaskMessage> list : bundle) { - connection.send(list); + private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap(); + + public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) { + for (String key : workerTupleSetMap.keySet()) { + + ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key); + if (null == bundle) { + bundle = new ArrayList<ArrayList<TaskMessage>>(); + bundles.put(key, bundle); + } + + ArrayList tupleSet = workerTupleSetMap.get(key); + if (null != tupleSet && tupleSet.size() > 0) { + bundle.add(tupleSet); + } } - - } - } - } - - private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) { - - if (null == bundle) { - return null; } - - return new Iterator<TaskMessage> () { - - private int offset = 0; - private int size = 0; - { - for (ArrayList<TaskMessage> list : bundle) { - size += list.size(); - } - } - - private int bundleOffset = 0; - private Iterator<TaskMessage> iter = bundle.get(bundleOffset).iterator(); - - @Override - public boolean hasNext() { - if (offset < size) { - return true; - } - return false; - } - - @Override - public TaskMessage next() { - TaskMessage msg = null; - if (iter.hasNext()) { - msg = iter.next(); - } else { - bundleOffset++; - iter = bundle.get(bundleOffset).iterator(); - msg = iter.next(); + + public void send(HashMap<String, IConnection> connections) { + for (String hostPort : bundles.keySet()) { + IConnection connection = connections.get(hostPort); + if (null != connection) { + ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort); + for (ArrayList<TaskMessage> list : bundle) { + connection.send(list); + } + + } } - if (null != msg) { - offset++; + } + + private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) { + + if (null == bundle) { + return null; } - return msg; - } - - @Override - public void remove() { - throw new RuntimeException("not supported"); - } - }; - } - - public void clear() { - bundles.clear(); - } + + return new Iterator<TaskMessage>() { + + private int offset = 0; + private int size = 0; + { + for (ArrayList<TaskMessage> list : bundle) { + size += list.size(); + } + } + + private int bundleOffset = 0; + private Iterator<TaskMessage> iter = bundle.get(bundleOffset).iterator(); + + @Override + public boolean hasNext() { + if (offset < size) { + return true; + } + return false; + } + + @Override + public TaskMessage next() { + TaskMessage msg = null; + if (iter.hasNext()) { + msg = iter.next(); + } else { + bundleOffset++; + iter = bundle.get(bundleOffset).iterator(); + msg = iter.next(); + } + if (null != msg) { + offset++; + } + return msg; + } + + @Override + public void remove() { + throw new RuntimeException("not supported"); + } + }; + } + + public void clear() { + bundles.clear(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/TupleHelpers.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/TupleHelpers.java b/jstorm-core/src/main/java/backtype/storm/utils/TupleHelpers.java index 45725c9..ce2a0b3 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/TupleHelpers.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/TupleHelpers.java @@ -22,9 +22,9 @@ import backtype.storm.tuple.Tuple; public class TupleHelpers { private TupleHelpers() { - + } - + public static boolean isTickTuple(Tuple tuple) { return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/TupleUtils.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/TupleUtils.java b/jstorm-core/src/main/java/backtype/storm/utils/TupleUtils.java index f9fb2c0..80b78d8 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/TupleUtils.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/TupleUtils.java @@ -22,14 +22,13 @@ import backtype.storm.tuple.Tuple; public final class TupleUtils { - private TupleUtils() { - // No instantiation - } + private TupleUtils() { + // No instantiation + } - public static boolean isTick(Tuple tuple) { - return tuple != null - && Constants.SYSTEM_COMPONENT_ID .equals(tuple.getSourceComponent()) - && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId()); - } + public static boolean isTick(Tuple tuple) { + return tuple != null && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent()) + && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId()); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/Utils.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/Utils.java b/jstorm-core/src/main/java/backtype/storm/utils/Utils.java index 0669cfb..9194d07 100644 --- a/jstorm-core/src/main/java/backtype/storm/utils/Utils.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/Utils.java @@ -17,45 +17,21 @@ */ package backtype.storm.utils; -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.FileReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.lang.reflect.Constructor; -import java.net.URL; -import java.net.URLClassLoader; -import java.net.URLDecoder; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; -import java.util.ArrayList; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.TreeMap; -import java.util.UUID; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - +import backtype.storm.Config; +import backtype.storm.generated.ComponentCommon; +import backtype.storm.generated.ComponentObject; +import backtype.storm.generated.StormTopology; +import backtype.storm.serialization.DefaultSerializationDelegate; +import backtype.storm.serialization.SerializationDelegate; +import clojure.lang.IFn; +import clojure.lang.RT; +import com.alibaba.jstorm.utils.LoadConf; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import org.apache.commons.io.input.ClassLoaderObjectInputStream; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.thrift.TException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; @@ -64,20 +40,19 @@ import org.json.simple.JSONValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; -import org.yaml.snakeyaml.constructor.SafeConstructor; -import backtype.storm.Config; -import backtype.storm.generated.ComponentCommon; -import backtype.storm.generated.ComponentObject; -import backtype.storm.generated.StormTopology; -import backtype.storm.serialization.DefaultSerializationDelegate; -import backtype.storm.serialization.SerializationDelegate; -import clojure.lang.IFn; -import clojure.lang.RT; - -import com.alibaba.jstorm.utils.LoadConf; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; +import java.io.*; +import java.lang.reflect.Constructor; +import java.net.URL; +import java.net.URLClassLoader; +import java.net.URLDecoder; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.*; +import java.util.Map.Entry; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; public class Utils { private static final Logger LOG = LoggerFactory.getLogger(Utils.class); @@ -99,7 +74,7 @@ public class Utils { } } - public static Object newInstance(String klass, Object ...params) { + public static Object newInstance(String klass, Object... params) { try { Class c = Class.forName(klass); Constructor[] constructors = c.getConstructors(); @@ -111,7 +86,7 @@ public class Utils { break; } } - + if (con == null) { throw new RuntimeException("Cound not found the corresponding constructor, params=" + params.toString()); } else { @@ -128,35 +103,34 @@ public class Utils { /** * Go thrift gzip serializer + * * @param obj * @return */ public static byte[] serialize(Object obj) { /** - * @@@ - * JStorm disable the thrift.gz.serializer + * @@@ JStorm disable the thrift.gz.serializer */ - //return serializationDelegate.serialize(obj); + // return serializationDelegate.serialize(obj); return javaSerialize(obj); } /** * Go thrift gzip serializer - * @param obj + * * @return */ public static <T> T deserialize(byte[] serialized, Class<T> clazz) { /** - * @@@ - * JStorm disable the thrift.gz.serializer + * @@@ JStorm disable the thrift.gz.serializer */ - //return serializationDelegate.deserialize(serialized, clazz); - return (T)javaDeserialize(serialized); + // return serializationDelegate.deserialize(serialized, clazz); + return (T) javaDeserialize(serialized); } public static byte[] javaSerialize(Object obj) { if (obj instanceof byte[]) { - return (byte[])obj; + return (byte[]) obj; } try { ByteArrayOutputStream bos = new ByteArrayOutputStream(); @@ -168,7 +142,7 @@ public class Utils { throw new RuntimeException(e); } } - + public static Object maybe_deserialize(byte[] data) { if (data == null || data.length == 0) { return null; @@ -179,9 +153,10 @@ public class Utils { return null; } } - + /** * Deserialized with ClassLoader + * * @param serialized * @param loader * @return @@ -206,20 +181,20 @@ public class Utils { throw new RuntimeException(e); } } - + public static Object javaDeserialize(byte[] serialized) { return javaDeserializeWithCL(serialized, WorkerClassLoader.getInstance()); } - + public static <T> T javaDeserialize(byte[] serialized, Class<T> clazz) { - return (T)javaDeserializeWithCL(serialized, WorkerClassLoader.getInstance()); + return (T) javaDeserializeWithCL(serialized, WorkerClassLoader.getInstance()); } - + public static String to_json(Object m) { // return JSON.toJSONString(m); return JSONValue.toJSONString(m); } - + public static Object from_json(String json) { if (json == null) { return null; @@ -228,14 +203,14 @@ public class Utils { return JSONValue.parse(json); } } - + public static String toPrettyJsonString(Object obj) { Gson gson2 = new GsonBuilder().setPrettyPrinting().create(); String ret = gson2.toJson(obj); - + return ret; } - + public static byte[] gzip(byte[] data) { try { ByteArrayOutputStream bos = new ByteArrayOutputStream(); @@ -269,9 +244,9 @@ public class Utils { public static <T> String join(Iterable<T> coll, String sep) { Iterator<T> it = coll.iterator(); String ret = ""; - while(it.hasNext()) { + while (it.hasNext()) { ret = ret + it.next(); - if(it.hasNext()) { + if (it.hasNext()) { ret = ret + sep; } } @@ -281,13 +256,14 @@ public class Utils { public static void sleep(long millis) { try { Time.sleep(millis); - } catch(InterruptedException e) { + } catch (InterruptedException e) { throw new RuntimeException(e); } } /** * Please directly use LoadConf.findResources(name); + * * @param name * @return */ @@ -298,6 +274,7 @@ public class Utils { /** * Please directly use LoadConf.findAndReadYaml(name); + * * @param name * @return */ @@ -306,9 +283,8 @@ public class Utils { return LoadConf.findAndReadYaml(name, mustExist, false); } - public static Map findAndReadConfigFile(String name) { - return LoadConf.findAndReadYaml(name, true, false); + return LoadConf.findAndReadYaml(name, true, false); } public static Map readDefaultConfig() { @@ -318,7 +294,7 @@ public class Utils { public static Map readCommandLineOpts() { Map ret = new HashMap(); String commandOptions = System.getProperty("storm.options"); - if(commandOptions != null) { + if (commandOptions != null) { String[] configs = commandOptions.split(","); for (String config : configs) { config = URLDecoder.decode(config); @@ -335,22 +311,21 @@ public class Utils { return ret; } - public static void replaceLocalDir(Map<Object, Object> conf) { String stormHome = System.getProperty("jstorm.home"); boolean isEmpty = StringUtils.isBlank(stormHome); - + Map<Object, Object> replaceMap = new HashMap<Object, Object>(); - + for (Entry entry : conf.entrySet()) { Object key = entry.getKey(); Object value = entry.getValue(); - + if (value instanceof String) { if (StringUtils.isBlank((String) value) == true) { continue; } - + String str = (String) value; if (isEmpty == true) { // replace %JSTORM_HOME% as current directory @@ -358,20 +333,20 @@ public class Utils { } else { str = str.replace("%JSTORM_HOME%", stormHome); } - + replaceMap.put(key, str); } } - + conf.putAll(replaceMap); } - + public static Map loadDefinedConf(String confFile) { File file = new File(confFile); if (file.exists() == false) { return findAndReadConfigFile(confFile, true); } - + Yaml yaml = new Yaml(); Map ret; try { @@ -381,10 +356,10 @@ public class Utils { } if (ret == null) ret = new HashMap(); - + return new HashMap(ret); } - + public static Map readStormConfig() { Map ret = readDefaultConfig(); String confFile = System.getProperty("storm.conf.file"); @@ -396,40 +371,41 @@ public class Utils { } ret.putAll(storm); ret.putAll(readCommandLineOpts()); - + replaceLocalDir(ret); return ret; } private static Object normalizeConf(Object conf) { - if(conf==null) return new HashMap(); - if(conf instanceof Map) { + if (conf == null) + return new HashMap(); + if (conf instanceof Map) { Map confMap = new HashMap((Map) conf); - for(Object key: confMap.keySet()) { + for (Object key : confMap.keySet()) { Object val = confMap.get(key); confMap.put(key, normalizeConf(val)); } return confMap; - } else if(conf instanceof List) { - List confList = new ArrayList((List) conf); - for(int i=0; i<confList.size(); i++) { + } else if (conf instanceof List) { + List confList = new ArrayList((List) conf); + for (int i = 0; i < confList.size(); i++) { Object val = confList.get(i); confList.set(i, normalizeConf(val)); } return confList; } else if (conf instanceof Integer) { return ((Integer) conf).longValue(); - } else if(conf instanceof Float) { + } else if (conf instanceof Float) { return ((Float) conf).doubleValue(); } else { return conf; } } - + public static boolean isValidConf(Map<String, Object> stormConf) { return normalizeConf(stormConf).equals(normalizeConf(Utils.from_json(Utils.to_json(stormConf)))); } - + public static Object getSetComponentObject(ComponentObject obj, URLClassLoader loader) { if (obj.getSetField() == ComponentObject._Fields.SERIALIZED_JAVA) { return javaDeserializeWithCL(obj.get_serialized_java(), loader); @@ -439,7 +415,7 @@ public class Utils { return obj.get_shell(); } } - + public static <S, T> T get(Map<S, T> m, S key, T def) { T ret = m.get(key); if (ret == null) { @@ -447,7 +423,7 @@ public class Utils { } return ret; } - + public static List<Object> tuple(Object... values) { List<Object> ret = new ArrayList<Object>(); for (Object v : values) { @@ -455,7 +431,7 @@ public class Utils { } return ret; } - + public static void downloadFromMaster(Map conf, String file, String localFile) throws IOException, TException { WritableByteChannel out = null; NimbusClient client = null; @@ -478,12 +454,12 @@ public class Utils { client.close(); } } - + public static IFn loadClojureFn(String namespace, String name) { try { - clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")")); + clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")")); } catch (Exception e) { - //if playing from the repl and defining functions, file won't exist + // if playing from the repl and defining functions, file won't exist } return (IFn) RT.var(namespace, name).deref(); } @@ -494,38 +470,38 @@ public class Utils { public static <K, V> Map<V, K> reverseMap(Map<K, V> map) { Map<V, K> ret = new HashMap<V, K>(); - for(K key: map.keySet()) { + for (K key : map.keySet()) { ret.put(map.get(key), key); } return ret; } public static ComponentCommon getComponentCommon(StormTopology topology, String id) { - if(topology.get_spouts().containsKey(id)) { + if (topology.get_spouts().containsKey(id)) { return topology.get_spouts().get(id).get_common(); } - if(topology.get_bolts().containsKey(id)) { + if (topology.get_bolts().containsKey(id)) { return topology.get_bolts().get(id).get_common(); } - if(topology.get_state_spouts().containsKey(id)) { + if (topology.get_state_spouts().containsKey(id)) { return topology.get_state_spouts().get(id).get_common(); } throw new IllegalArgumentException("Could not find component with id " + id); } public static Integer getInt(Object o) { - Integer result = getInt(o, null); - if (null == result) { - throw new IllegalArgumentException("Don't know how to convert null to int"); - } - return result; + Integer result = getInt(o, null); + if (null == result) { + throw new IllegalArgumentException("Don't know how to convert null to int"); + } + return result; } - + public static Integer getInt(Object o, Integer defaultValue) { if (null == o) { return defaultValue; } - + if (o instanceof Number) { return ((Number) o).intValue(); } else if (o instanceof String) { @@ -534,38 +510,18 @@ public class Utils { throw new IllegalArgumentException("Don't know how to convert " + o + " to int"); } } - + public static long secureRandomLong() { return UUID.randomUUID().getLeastSignificantBits(); } - - public static class BoundedExponentialBackoffRetry extends ExponentialBackoffRetry { - - protected final int maxRetryInterval; - - public BoundedExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepTimeMs) { - super(baseSleepTimeMs, maxRetries); - this.maxRetryInterval = maxSleepTimeMs; - } - - public int getMaxRetryInterval() { - return this.maxRetryInterval; - } - - @Override - public int getSleepTimeMs(int count, long elapsedMs) { - return Math.min(maxRetryInterval, super.getSleepTimeMs(count, elapsedMs)); - } - - } - + public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root) { return newCurator(conf, servers, port, root, null); } public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) { List<String> serverPorts = new ArrayList<String>(); - for(String zkServer: (List<String>) servers) { + for (String zkServer : (List<String>) servers) { serverPorts.add(zkServer + ":" + Utils.getInt(port)); } String zkStr = StringUtils.join(serverPorts, ",") + root; @@ -576,17 +532,15 @@ public class Utils { return builder.build(); } - protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, String zkStr, Map conf, ZookeeperAuthInfo auth) - { + protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, String zkStr, Map conf, ZookeeperAuthInfo auth) { builder.connectString(zkStr) - .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT))) - .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT))) - .retryPolicy(new StormBoundedExponentialBackoffRetry( - Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)), - Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)), - Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)))); - - if(auth!=null && auth.scheme!=null && auth.payload!=null) { + .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT))) + .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT))) + .retryPolicy( + new StormBoundedExponentialBackoffRetry(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)), Utils.getInt(conf + .get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)))); + + if (auth != null && auth.scheme != null && auth.payload != null) { builder = builder.authorization(auth.scheme, auth.payload); } } @@ -608,15 +562,10 @@ public class Utils { } /** - * -(defn integer-divided [sum num-pieces] - (let [base (int (/ sum num-pieces)) - num-inc (mod sum num-pieces) - num-bases (- num-pieces num-inc)] - (if (= num-inc 0) - {base num-bases} - {base num-bases (inc base) num-inc} - ))) + * + (defn integer-divided [sum num-pieces] (let [base (int (/ sum num-pieces)) num-inc (mod sum num-pieces) num-bases (- num-pieces num-inc)] (if (= num-inc + * 0) {base num-bases} {base num-bases (inc base) num-inc} ))) + * * @param sum * @param numPieces * @return @@ -628,8 +577,8 @@ public class Utils { int numBases = numPieces - numInc; TreeMap<Integer, Integer> ret = new TreeMap<Integer, Integer>(); ret.put(base, numBases); - if(numInc!=0) { - ret.put(base+1, numInc); + if (numInc != 0) { + ret.put(base + 1, numInc); } return ret; } @@ -644,7 +593,7 @@ public class Utils { try { BufferedReader r = new BufferedReader(new InputStreamReader(in)); String line = null; - while ((line = r.readLine())!= null) { + while ((line = r.readLine()) != null) { LOG.info("{}:{}", prefix, line); } } catch (IOException e) { @@ -654,8 +603,8 @@ public class Utils { public static boolean exceptionCauseIsInstanceOf(Class klass, Throwable throwable) { Throwable t = throwable; - while(t != null) { - if(klass.isInstance(t)) { + while (t != null) { + if (klass.isInstance(t)) { return true; } t = t.getCause(); @@ -664,71 +613,70 @@ public class Utils { } /** - * Is the cluster configured to interact with ZooKeeper in a secure way? - * This only works when called from within Nimbus or a Supervisor process. + * Is the cluster configured to interact with ZooKeeper in a secure way? This only works when called from within Nimbus or a Supervisor process. + * * @param conf the storm configuration, not the topology configuration * @return true if it is configured else false. */ public static boolean isZkAuthenticationConfiguredStormServer(Map conf) { return null != System.getProperty("java.security.auth.login.config") - || (conf != null - && conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME) != null - && ! ((String)conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty()); + || (conf != null && conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME) != null && !((String) conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty()); } /** * Is the topology configured to have ZooKeeper authentication. + * * @param conf the topology configuration * @return true if ZK is configured else false */ public static boolean isZkAuthenticationConfiguredTopology(Map conf) { - return (conf != null - && conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME) != null - && ! ((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty()); + return (conf != null && conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME) != null && !((String) conf + .get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty()); } public static List<ACL> getWorkerACL(Map conf) { - //This is a work around to an issue with ZK where a sasl super user is not super unless there is an open SASL ACL so we are trying to give the correct perms + // This is a work around to an issue with ZK where a sasl super user is not super unless there is an open SASL ACL so we are trying to give the correct + // perms if (!isZkAuthenticationConfiguredTopology(conf)) { return null; } - String stormZKUser = (String)conf.get(Config.STORM_ZOOKEEPER_SUPERACL); + String stormZKUser = (String) conf.get(Config.STORM_ZOOKEEPER_SUPERACL); if (stormZKUser == null) { - throw new IllegalArgumentException("Authentication is enabled but "+Config.STORM_ZOOKEEPER_SUPERACL+" is not set"); + throw new IllegalArgumentException("Authentication is enabled but " + Config.STORM_ZOOKEEPER_SUPERACL + " is not set"); } - String[] split = stormZKUser.split(":",2); + String[] split = stormZKUser.split(":", 2); if (split.length != 2) { - throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL+" does not appear to be in the form scheme:acl, i.e. sasl:storm-user"); + throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL + " does not appear to be in the form scheme:acl, i.e. sasl:storm-user"); } ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL); ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1]))); return ret; } - public static String threadDump() { - final StringBuilder dump = new StringBuilder(); - final java.lang.management.ThreadMXBean threadMXBean = java.lang.management.ManagementFactory.getThreadMXBean(); - final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100); - for (java.lang.management.ThreadInfo threadInfo : threadInfos) { - dump.append('"'); - dump.append(threadInfo.getThreadName()); - dump.append("\" "); - final Thread.State state = threadInfo.getThreadState(); - dump.append("\n java.lang.Thread.State: "); - dump.append(state); - final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace(); - for (final StackTraceElement stackTraceElement : stackTraceElements) { - dump.append("\n at "); - dump.append(stackTraceElement); - } - dump.append("\n\n"); - } - return dump.toString(); - } + public static String threadDump() { + final StringBuilder dump = new StringBuilder(); + final java.lang.management.ThreadMXBean threadMXBean = java.lang.management.ManagementFactory.getThreadMXBean(); + final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100); + for (java.lang.management.ThreadInfo threadInfo : threadInfos) { + dump.append('"'); + dump.append(threadInfo.getThreadName()); + dump.append("\" "); + final Thread.State state = threadInfo.getThreadState(); + dump.append("\n java.lang.Thread.State: "); + dump.append(state); + final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace(); + for (final StackTraceElement stackTraceElement : stackTraceElements) { + dump.append("\n at "); + dump.append(stackTraceElement); + } + dump.append("\n\n"); + } + return dump.toString(); + } // Assumes caller is synchronizing private static SerializationDelegate getSerializationDelegate(Map stormConf) { - String delegateClassName = (String)stormConf.get(Config.STORM_META_SERIALIZATION_DELEGATE); + String delegateClassName = (String) stormConf.get(Config.STORM_META_SERIALIZATION_DELEGATE); SerializationDelegate delegate; try { Class delegateClass = Class.forName(delegateClassName); @@ -747,27 +695,25 @@ public class Utils { return delegate; } - public static void handleUncaughtException(Throwable t) { - if (t != null && t instanceof Error) { - if (t instanceof OutOfMemoryError) { - try { - System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName()); - } catch (Throwable err) { - //Again we don't want to exit because of logging issues. + public static void handleUncaughtException(Throwable t) { + if (t != null && t instanceof Error) { + if (t instanceof OutOfMemoryError) { + try { + System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName()); + } catch (Throwable err) { + // Again we don't want to exit because of logging issues. + } + Runtime.getRuntime().halt(-1); + } else { + // Running in daemon mode, we would pass Error to calling thread. + throw (Error) t; + } } - Runtime.getRuntime().halt(-1); - } else { - //Running in daemon mode, we would pass Error to calling thread. - throw (Error) t; - } } - } - - public static List<String> tokenize_path(String path) { String[] toks = path.split("/"); - java.util.ArrayList<String> rtn = new ArrayList<String>(); + ArrayList<String> rtn = new ArrayList<String>(); for (String str : toks) { if (!str.isEmpty()) { rtn.add(str); @@ -775,7 +721,7 @@ public class Utils { } return rtn; } - + public static String toks_to_path(List<String> toks) { StringBuffer buff = new StringBuffer(); buff.append("/"); @@ -785,16 +731,16 @@ public class Utils { if (i < (size - 1)) { buff.append("/"); } - + } return buff.toString(); } - + public static String normalize_path(String path) { String rtn = toks_to_path(tokenize_path(path)); return rtn; } - + public static String printStack() { StringBuilder sb = new StringBuilder(); sb.append("\nCurrent call stack:\n"); @@ -802,14 +748,14 @@ public class Utils { for (int i = 2; i < stackElements.length; i++) { sb.append("\t").append(stackElements[i]).append("\n"); } - + return sb.toString(); } - + private static Map loadProperty(String prop) { Map ret = new HashMap<Object, Object>(); Properties properties = new Properties(); - + try { InputStream stream = new FileInputStream(prop); properties.load(stream); @@ -826,14 +772,14 @@ public class Utils { e1.printStackTrace(); throw new RuntimeException(e1.getMessage()); } - + return ret; } - + private static Map loadYaml(String confPath) { Map ret = new HashMap<Object, Object>(); Yaml yaml = new Yaml(); - + try { InputStream stream = new FileInputStream(confPath); ret = (Map) yaml.load(stream); @@ -848,10 +794,10 @@ public class Utils { e1.printStackTrace(); throw new RuntimeException("Failed to read config file"); } - + return ret; } - + public static Map loadConf(String arg) { Map ret = null; if (arg.endsWith("yaml")) { @@ -866,13 +812,11 @@ public class Utils { String ret = ""; InputStream input = null; try { - input = - Utils.class.getClassLoader().getResourceAsStream("version"); + input = Utils.class.getClassLoader().getResourceAsStream("version"); BufferedReader in = new BufferedReader(new InputStreamReader(input)); - String s = in.readLine(); - ret = s.trim(); - - + String s = in.readLine(); + ret = s.trim(); + } catch (Exception e) { LOG.warn("Failed to get version", e); } finally { @@ -892,7 +836,7 @@ public class Utils { bytes[offset++] = (byte) (value & 0x000000FF); bytes[offset++] = (byte) ((value & 0x0000FF00) >> 8); bytes[offset++] = (byte) ((value & 0x00FF0000) >> 16); - bytes[offset] = (byte) ((value & 0xFF000000) >> 24); + bytes[offset] = (byte) ((value & 0xFF000000) >> 24); } public static int readIntFromByteArray(byte[] bytes, int offset) { @@ -900,7 +844,7 @@ public class Utils { ret = ret | (bytes[offset++] & 0x000000FF); ret = ret | ((bytes[offset++] << 8) & 0x0000FF00); ret = ret | ((bytes[offset++] << 16) & 0x00FF0000); - ret = ret | ((bytes[offset] << 24) & 0xFF000000); + ret = ret | ((bytes[offset] << 24) & 0xFF000000); return ret; } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/VersionInfo.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/VersionInfo.java b/jstorm-core/src/main/java/backtype/storm/utils/VersionInfo.java index 1740e18..456dfd0 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/VersionInfo.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/VersionInfo.java @@ -24,108 +24,102 @@ import java.util.Properties; public class VersionInfo { - private Properties info; - - protected VersionInfo(String component) { - info = new Properties(); - String versionInfoFile = component + "-version-info.properties"; - InputStream is = null; - try { - is = Thread.currentThread().getContextClassLoader() - .getResourceAsStream(versionInfoFile); - if (is == null) { - throw new IOException("Resource not found"); - } - info.load(is); - } catch (IOException ex) { - } finally { - if (is != null) { - try { - - is.close(); - } catch (IOException ioex) { - } - - } - } - } - - protected String _getVersion() { - return info.getProperty("version", "Unknown"); - } - - protected String _getRevision() { - return info.getProperty("revision", "Unknown"); - } - - protected String _getBranch() { - return info.getProperty("branch", "Unknown"); - } - - protected String _getDate() { - return info.getProperty("date", "Unknown"); - } - - protected String _getUser() { - return info.getProperty("user", "Unknown"); - } - - protected String _getUrl() { - return info.getProperty("url", "Unknown"); - } - - protected String _getSrcChecksum() { - return info.getProperty("srcChecksum", "Unknown"); - } - - protected String _getBuildVersion(){ - return getVersion() + - " from " + _getRevision() + - " by " + _getUser() + - " source checksum " + _getSrcChecksum(); - } - - - private static VersionInfo COMMON_VERSION_INFO = new VersionInfo("storm-core"); - - public static String getVersion() { - return COMMON_VERSION_INFO._getVersion(); - } - - public static String getRevision() { - return COMMON_VERSION_INFO._getRevision(); - } - - public static String getBranch() { - return COMMON_VERSION_INFO._getBranch(); - } - - public static String getDate() { - return COMMON_VERSION_INFO._getDate(); - } - - public static String getUser() { - return COMMON_VERSION_INFO._getUser(); - } - - public static String getUrl() { - return COMMON_VERSION_INFO._getUrl(); - } - - public static String getSrcChecksum() { - return COMMON_VERSION_INFO._getSrcChecksum(); - } - - public static String getBuildVersion(){ - return COMMON_VERSION_INFO._getBuildVersion(); - } - - - public static void main(String[] args) { - System.out.println("Storm " + getVersion()); - System.out.println("URL " + getUrl() + " -r " + getRevision()); - System.out.println("Branch " + getBranch()); - System.out.println("Compiled by " + getUser() + " on " + getDate()); - System.out.println("From source with checksum " + getSrcChecksum()); - } + private Properties info; + + protected VersionInfo(String component) { + info = new Properties(); + String versionInfoFile = component + "-version-info.properties"; + InputStream is = null; + try { + is = Thread.currentThread().getContextClassLoader().getResourceAsStream(versionInfoFile); + if (is == null) { + throw new IOException("Resource not found"); + } + info.load(is); + } catch (IOException ex) { + } finally { + if (is != null) { + try { + + is.close(); + } catch (IOException ioex) { + } + + } + } + } + + protected String _getVersion() { + return info.getProperty("version", "Unknown"); + } + + protected String _getRevision() { + return info.getProperty("revision", "Unknown"); + } + + protected String _getBranch() { + return info.getProperty("branch", "Unknown"); + } + + protected String _getDate() { + return info.getProperty("date", "Unknown"); + } + + protected String _getUser() { + return info.getProperty("user", "Unknown"); + } + + protected String _getUrl() { + return info.getProperty("url", "Unknown"); + } + + protected String _getSrcChecksum() { + return info.getProperty("srcChecksum", "Unknown"); + } + + protected String _getBuildVersion() { + return getVersion() + " from " + _getRevision() + " by " + _getUser() + " source checksum " + _getSrcChecksum(); + } + + private static VersionInfo COMMON_VERSION_INFO = new VersionInfo("storm-core"); + + public static String getVersion() { + return COMMON_VERSION_INFO._getVersion(); + } + + public static String getRevision() { + return COMMON_VERSION_INFO._getRevision(); + } + + public static String getBranch() { + return COMMON_VERSION_INFO._getBranch(); + } + + public static String getDate() { + return COMMON_VERSION_INFO._getDate(); + } + + public static String getUser() { + return COMMON_VERSION_INFO._getUser(); + } + + public static String getUrl() { + return COMMON_VERSION_INFO._getUrl(); + } + + public static String getSrcChecksum() { + return COMMON_VERSION_INFO._getSrcChecksum(); + } + + public static String getBuildVersion() { + return COMMON_VERSION_INFO._getBuildVersion(); + } + + public static void main(String[] args) { + System.out.println("Storm " + getVersion()); + System.out.println("URL " + getUrl() + " -r " + getRevision()); + System.out.println("Branch " + getBranch()); + System.out.println("Compiled by " + getUser() + " on " + getDate()); + System.out.println("From source with checksum " + getSrcChecksum()); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/VersionedStore.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/VersionedStore.java b/jstorm-core/src/main/java/backtype/storm/utils/VersionedStore.java index 07ce5a8..0852292 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/VersionedStore.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/VersionedStore.java @@ -30,10 +30,10 @@ public class VersionedStore { private static final String FINISHED_VERSION_SUFFIX = ".version"; private String _root; - + public VersionedStore(String path) throws IOException { - _root = path; - mkdirs(_root); + _root = path; + mkdirs(_root); } public String getRoot() { @@ -46,26 +46,30 @@ public class VersionedStore { public String mostRecentVersionPath() throws IOException { Long v = mostRecentVersion(); - if(v==null) return null; + if (v == null) + return null; return versionPath(v); } public String mostRecentVersionPath(long maxVersion) throws IOException { Long v = mostRecentVersion(maxVersion); - if(v==null) return null; + if (v == null) + return null; return versionPath(v); } public Long mostRecentVersion() throws IOException { List<Long> all = getAllVersions(); - if(all.size()==0) return null; + if (all.size() == 0) + return null; return all.get(0); } public Long mostRecentVersion(long maxVersion) throws IOException { List<Long> all = getAllVersions(); - for(Long v: all) { - if(v <= maxVersion) return v; + for (Long v : all) { + if (v <= maxVersion) + return v; } return null; } @@ -73,7 +77,7 @@ public class VersionedStore { public String createVersion() throws IOException { Long mostRecent = mostRecentVersion(); long version = Time.currentTimeMillis(); - if(mostRecent!=null && version <= mostRecent) { + if (mostRecent != null && version <= mostRecent) { version = mostRecent + 1; } return createVersion(version); @@ -81,7 +85,7 @@ public class VersionedStore { public String createVersion(long version) throws IOException { String ret = versionPath(version); - if(getAllVersions().contains(version)) + if (getAllVersions().contains(version)) throw new RuntimeException("Version already exists or data already exists"); else return ret; @@ -95,11 +99,11 @@ public class VersionedStore { File versionFile = new File(versionPath(version)); File tokenFile = new File(tokenPath(version)); - if(tokenFile.exists()) { + if (tokenFile.exists()) { FileUtils.forceDelete(tokenFile); } - if(versionFile.exists()) { + if (versionFile.exists()) { FileUtils.forceDelete(versionFile); } } @@ -116,14 +120,14 @@ public class VersionedStore { public void cleanup(int versionsToKeep) throws IOException { List<Long> versions = getAllVersions(); - if(versionsToKeep >= 0) { + if (versionsToKeep >= 0) { versions = versions.subList(0, Math.min(versions.size(), versionsToKeep)); } HashSet<Long> keepers = new HashSet<Long>(versions); - for(String p: listDir(_root)) { + for (String p : listDir(_root)) { Long v = parseVersion(p); - if(v!=null && !keepers.contains(v)) { + if (v != null && !keepers.contains(v)) { deleteVersion(v); } } @@ -134,8 +138,8 @@ public class VersionedStore { */ public List<Long> getAllVersions() throws IOException { List<Long> ret = new ArrayList<Long>(); - for(String s: listDir(_root)) { - if(s.endsWith(FINISHED_VERSION_SUFFIX)) { + for (String s : listDir(_root)) { + if (s.endsWith(FINISHED_VERSION_SUFFIX)) { ret.add(validateAndGetVersion(s)); } } @@ -150,18 +154,19 @@ public class VersionedStore { private long validateAndGetVersion(String path) { Long v = parseVersion(path); - if(v==null) throw new RuntimeException(path + " is not a valid version"); + if (v == null) + throw new RuntimeException(path + " is not a valid version"); return v; } private Long parseVersion(String path) { String name = new File(path).getName(); - if(name.endsWith(FINISHED_VERSION_SUFFIX)) { - name = name.substring(0, name.length()-FINISHED_VERSION_SUFFIX.length()); + if (name.endsWith(FINISHED_VERSION_SUFFIX)) { + name = name.substring(0, name.length() - FINISHED_VERSION_SUFFIX.length()); } try { return Long.parseLong(name); - } catch(NumberFormatException e) { + } catch (NumberFormatException e) { return null; } } @@ -173,12 +178,12 @@ public class VersionedStore { private void mkdirs(String path) throws IOException { new File(path).mkdirs(); } - + private List<String> listDir(String dir) throws IOException { List<String> ret = new ArrayList<String>(); File[] contents = new File(dir).listFiles(); - if(contents!=null) { - for(File f: contents) { + if (contents != null) { + for (File f : contents) { ret.add(f.getAbsolutePath()); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/WindowedTimeThrottler.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/WindowedTimeThrottler.java b/jstorm-core/src/main/java/backtype/storm/utils/WindowedTimeThrottler.java index 5a288a0..6290f5a 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/WindowedTimeThrottler.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/WindowedTimeThrottler.java @@ -22,28 +22,28 @@ public class WindowedTimeThrottler { int _maxAmt; long _windowStartTime; int _windowEvents = 0; - + public WindowedTimeThrottler(Number windowMillis, Number maxAmt) { _windowMillis = windowMillis.longValue(); _maxAmt = maxAmt.intValue(); _windowStartTime = System.currentTimeMillis(); } - + public boolean isThrottled() { resetIfNecessary(); return _windowEvents >= _maxAmt; } - - //returns void if the event should continue, false if the event should not be done + + // returns void if the event should continue, false if the event should not be done public void markEvent() { resetIfNecessary(); _windowEvents++; - + } - + private void resetIfNecessary() { long now = System.currentTimeMillis(); - if(now - _windowStartTime > _windowMillis) { + if (now - _windowStartTime > _windowMillis) { _windowStartTime = now; _windowEvents = 0; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/WorkerClassLoader.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/WorkerClassLoader.java b/jstorm-core/src/main/java/backtype/storm/utils/WorkerClassLoader.java index f3526b1..4c2f35c 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/WorkerClassLoader.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/WorkerClassLoader.java @@ -28,30 +28,30 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class WorkerClassLoader extends URLClassLoader { - + public static Logger LOG = LoggerFactory.getLogger(WorkerClassLoader.class); - + private ClassLoader defaultClassLoader; - + private ClassLoader JDKClassLoader; - + private boolean isDebug; - + protected static WorkerClassLoader instance; - + protected static boolean enable; - + protected static Map<Thread, ClassLoader> threadContextCache; - + protected WorkerClassLoader(URL[] urls, ClassLoader defaultClassLoader, ClassLoader JDKClassLoader, boolean isDebug) { super(urls, JDKClassLoader); this.defaultClassLoader = defaultClassLoader; this.JDKClassLoader = JDKClassLoader; this.isDebug = isDebug; - + // TODO Auto-generated constructor stub } - + // for all log go through logback when enable classloader protected boolean isLogByDefault(String name) { if (name.startsWith("org.apache.log4j")) { @@ -59,11 +59,11 @@ public class WorkerClassLoader extends URLClassLoader { } else if (name.startsWith("org.slf4j")) { return true; } - + return false; - + } - + protected boolean isLoadByDefault(String name) { if (name.startsWith("backtype.storm") == true) { return true; @@ -75,100 +75,101 @@ public class WorkerClassLoader extends URLClassLoader { return false; } } - + @Override public Class<?> loadClass(String name) throws ClassNotFoundException { Class<?> result = null; try { result = this.findLoadedClass(name); - + if (result != null) { return result; } - + try { result = JDKClassLoader.loadClass(name); if (result != null) return result; } catch (Exception e) { - + } - + try { if (isLoadByDefault(name) == false) { result = findClass(name); - + if (result != null) { return result; } } - + } catch (Exception e) { - + } - + result = defaultClassLoader.loadClass(name); return result; - + } finally { if (result != null) { ClassLoader resultClassLoader = result.getClassLoader(); - LOG.info("Successfully load class " + name + " by " + resultClassLoader + ",threadContextLoader:" + Thread.currentThread().getContextClassLoader()); + LOG.info("Successfully load class " + name + " by " + resultClassLoader + ",threadContextLoader:" + + Thread.currentThread().getContextClassLoader()); } else { LOG.warn("Failed to load class " + name + ",threadContextLoader:" + Thread.currentThread().getContextClassLoader()); } - + if (isDebug) { LOG.info(Utils.printStack()); } } - + } - + public static WorkerClassLoader mkInstance(URL[] urls, ClassLoader DefaultClassLoader, ClassLoader JDKClassLoader, boolean enable, boolean isDebug) { WorkerClassLoader.enable = enable; if (enable == false) { LOG.info("Don't enable UserDefine ClassLoader"); return null; } - + synchronized (WorkerClassLoader.class) { if (instance == null) { instance = new WorkerClassLoader(urls, DefaultClassLoader, JDKClassLoader, isDebug); - + threadContextCache = new ConcurrentHashMap<Thread, ClassLoader>(); } - + } - + LOG.info("Successfully create classloader " + mk_list(urls)); return instance; } - + public static WorkerClassLoader getInstance() { return instance; } - + public static boolean isEnable() { return enable; } - + public static void switchThreadContext() { if (enable == false) { return; } - + Thread thread = Thread.currentThread(); ClassLoader oldClassLoader = thread.getContextClassLoader(); threadContextCache.put(thread, oldClassLoader); thread.setContextClassLoader(instance); } - + public static void restoreThreadContext() { if (enable == false) { return; } - + Thread thread = Thread.currentThread(); ClassLoader oldClassLoader = threadContextCache.get(thread); if (oldClassLoader != null) { @@ -177,7 +178,7 @@ public class WorkerClassLoader extends URLClassLoader { LOG.info("No context classloader of " + thread.getName()); } } - + private static <V> List<V> mk_list(V... args) { ArrayList<V> rtn = new ArrayList<V>(); for (V o : args) { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/WritableUtils.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/WritableUtils.java b/jstorm-core/src/main/java/backtype/storm/utils/WritableUtils.java index 8516f97..2c0a2a3 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/WritableUtils.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/WritableUtils.java @@ -42,334 +42,314 @@ package backtype.storm.utils; import java.io.*; - import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -public final class WritableUtils { - - public static byte[] readCompressedByteArray(DataInput in) throws IOException { - int length = in.readInt(); - if (length == -1) return null; - byte[] buffer = new byte[length]; - in.readFully(buffer); // could/should use readFully(buffer,0,length)? - GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buffer, 0, buffer.length)); - byte[] outbuf = new byte[length]; - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - int len; - while((len=gzi.read(outbuf, 0, outbuf.length)) != -1){ - bos.write(outbuf, 0, len); - } - byte[] decompressed = bos.toByteArray(); - bos.close(); - gzi.close(); - return decompressed; - } - - public static void skipCompressedByteArray(DataInput in) throws IOException { - int length = in.readInt(); - if (length != -1) { - skipFully(in, length); - } - } - - public static int writeCompressedByteArray(DataOutput out, byte[] bytes) throws IOException { - if (bytes != null) { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - GZIPOutputStream gzout = new GZIPOutputStream(bos); - gzout.write(bytes, 0, bytes.length); - gzout.close(); - byte[] buffer = bos.toByteArray(); - int len = buffer.length; - out.writeInt(len); - out.write(buffer, 0, len); - /* debug only! Once we have confidence, can lose this. */ - return ((bytes.length != 0) ? (100*buffer.length)/bytes.length : 0); - } else { - out.writeInt(-1); - return -1; +public final class WritableUtils { + + public static byte[] readCompressedByteArray(DataInput in) throws IOException { + int length = in.readInt(); + if (length == -1) + return null; + byte[] buffer = new byte[length]; + in.readFully(buffer); // could/should use readFully(buffer,0,length)? + GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buffer, 0, buffer.length)); + byte[] outbuf = new byte[length]; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int len; + while ((len = gzi.read(outbuf, 0, outbuf.length)) != -1) { + bos.write(outbuf, 0, len); + } + byte[] decompressed = bos.toByteArray(); + bos.close(); + gzi.close(); + return decompressed; } - } - - - /* Ugly utility, maybe someone else can do this better */ - public static String readCompressedString(DataInput in) throws IOException { - byte[] bytes = readCompressedByteArray(in); - if (bytes == null) return null; - return new String(bytes, "UTF-8"); - } - - - public static int writeCompressedString(DataOutput out, String s) throws IOException { - return writeCompressedByteArray(out, (s != null) ? s.getBytes("UTF-8") : null); - } - - /* - * - * Write a String as a Network Int n, followed by n Bytes - * Alternative to 16 bit read/writeUTF. - * Encoding standard is... ? - * - */ - public static void writeString(DataOutput out, String s) throws IOException { - if (s != null) { - byte[] buffer = s.getBytes("UTF-8"); - int len = buffer.length; - out.writeInt(len); - out.write(buffer, 0, len); - } else { - out.writeInt(-1); - } - } - - /* - * Read a String as a Network Int n, followed by n Bytes - * Alternative to 16 bit read/writeUTF. - * Encoding standard is... ? - * - */ - public static String readString(DataInput in) throws IOException{ - int length = in.readInt(); - if (length == -1) return null; - byte[] buffer = new byte[length]; - in.readFully(buffer); // could/should use readFully(buffer,0,length)? - return new String(buffer,"UTF-8"); - } - - - /* - * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. - * Could be generalised using introspection. - * - */ - public static void writeStringArray(DataOutput out, String[] s) throws IOException{ - out.writeInt(s.length); - for(int i = 0; i < s.length; i++) { - writeString(out, s[i]); + + public static void skipCompressedByteArray(DataInput in) throws IOException { + int length = in.readInt(); + if (length != -1) { + skipFully(in, length); + } } - } - - /* - * Write a String array as a Nework Int N, followed by Int N Byte Array of - * compressed Strings. Handles also null arrays and null values. - * Could be generalised using introspection. - * - */ - public static void writeCompressedStringArray(DataOutput out, String[] s) throws IOException{ - if (s == null) { - out.writeInt(-1); - return; + + public static int writeCompressedByteArray(DataOutput out, byte[] bytes) throws IOException { + if (bytes != null) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + GZIPOutputStream gzout = new GZIPOutputStream(bos); + gzout.write(bytes, 0, bytes.length); + gzout.close(); + byte[] buffer = bos.toByteArray(); + int len = buffer.length; + out.writeInt(len); + out.write(buffer, 0, len); + /* debug only! Once we have confidence, can lose this. */ + return ((bytes.length != 0) ? (100 * buffer.length) / bytes.length : 0); + } else { + out.writeInt(-1); + return -1; + } } - out.writeInt(s.length); - for(int i = 0; i < s.length; i++) { - writeCompressedString(out, s[i]); + + /* Ugly utility, maybe someone else can do this better */ + public static String readCompressedString(DataInput in) throws IOException { + byte[] bytes = readCompressedByteArray(in); + if (bytes == null) + return null; + return new String(bytes, "UTF-8"); } - } - - /* - * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. - * Could be generalised using introspection. Actually this bit couldn't... - * - */ - public static String[] readStringArray(DataInput in) throws IOException { - int len = in.readInt(); - if (len == -1) return null; - String[] s = new String[len]; - for(int i = 0; i < len; i++) { - s[i] = readString(in); + + public static int writeCompressedString(DataOutput out, String s) throws IOException { + return writeCompressedByteArray(out, (s != null) ? s.getBytes("UTF-8") : null); } - return s; - } - - - /* - * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. - * Could be generalised using introspection. Handles null arrays and null values. - * - */ - public static String[] readCompressedStringArray(DataInput in) throws IOException { - int len = in.readInt(); - if (len == -1) return null; - String[] s = new String[len]; - for(int i = 0; i < len; i++) { - s[i] = readCompressedString(in); + + /* + * + * Write a String as a Network Int n, followed by n Bytes Alternative to 16 bit read/writeUTF. Encoding standard is... ? + */ + public static void writeString(DataOutput out, String s) throws IOException { + if (s != null) { + byte[] buffer = s.getBytes("UTF-8"); + int len = buffer.length; + out.writeInt(len); + out.write(buffer, 0, len); + } else { + out.writeInt(-1); + } } - return s; - } - - - /* - * - * Test Utility Method Display Byte Array. - * - */ - public static void displayByteArray(byte[] record){ - int i; - for(i=0;i < record.length -1; i++){ - if (i % 16 == 0) { System.out.println(); } - System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F)); - System.out.print(Integer.toHexString(record[i] & 0x0F)); - System.out.print(","); + + /* + * Read a String as a Network Int n, followed by n Bytes Alternative to 16 bit read/writeUTF. Encoding standard is... ? + */ + public static String readString(DataInput in) throws IOException { + int length = in.readInt(); + if (length == -1) + return null; + byte[] buffer = new byte[length]; + in.readFully(buffer); // could/should use readFully(buffer,0,length)? + return new String(buffer, "UTF-8"); } - System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F)); - System.out.print(Integer.toHexString(record[i] & 0x0F)); - System.out.println(); - } - - - /** - * Serializes an integer to a binary stream with zero-compressed encoding. - * For -120 <= i <= 127, only one byte is used with the actual value. - * For other values of i, the first byte value indicates whether the - * integer is positive or negative, and the number of bytes that follow. - * If the first byte value v is between -121 and -124, the following integer - * is positive, with number of bytes that follow are -(v+120). - * If the first byte value v is between -125 and -128, the following integer - * is negative, with number of bytes that follow are -(v+124). Bytes are - * stored in the high-non-zero-byte-first order. - * - * @param stream Binary output stream - * @param i Integer to be serialized - * @throws java.io.IOException - */ - public static void writeVInt(DataOutput stream, int i) throws IOException { - writeVLong(stream, i); - } - - /** - * Serializes a long to a binary stream with zero-compressed encoding. - * For -112 <= i <= 127, only one byte is used with the actual value. - * For other values of i, the first byte value indicates whether the - * long is positive or negative, and the number of bytes that follow. - * If the first byte value v is between -113 and -120, the following long - * is positive, with number of bytes that follow are -(v+112). - * If the first byte value v is between -121 and -128, the following long - * is negative, with number of bytes that follow are -(v+120). Bytes are - * stored in the high-non-zero-byte-first order. - * - * @param stream Binary output stream - * @param i Long to be serialized - * @throws java.io.IOException - */ - public static void writeVLong(DataOutput stream, long i) throws IOException { - if (i >= -112 && i <= 127) { - stream.writeByte((byte)i); - return; + + /* + * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. Could be generalised using introspection. + */ + public static void writeStringArray(DataOutput out, String[] s) throws IOException { + out.writeInt(s.length); + for (int i = 0; i < s.length; i++) { + writeString(out, s[i]); + } } - int len = -112; - if (i < 0) { - i ^= -1L; // take one's complement' - len = -120; + /* + * Write a String array as a Nework Int N, followed by Int N Byte Array of compressed Strings. Handles also null arrays and null values. Could be + * generalised using introspection. + */ + public static void writeCompressedStringArray(DataOutput out, String[] s) throws IOException { + if (s == null) { + out.writeInt(-1); + return; + } + out.writeInt(s.length); + for (int i = 0; i < s.length; i++) { + writeCompressedString(out, s[i]); + } } - long tmp = i; - while (tmp != 0) { - tmp = tmp >> 8; - len--; + /* + * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. Could be generalised using introspection. Actually this bit couldn't... + */ + public static String[] readStringArray(DataInput in) throws IOException { + int len = in.readInt(); + if (len == -1) + return null; + String[] s = new String[len]; + for (int i = 0; i < len; i++) { + s[i] = readString(in); + } + return s; } - stream.writeByte((byte)len); + /* + * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. Could be generalised using introspection. Handles null arrays and null + * values. + */ + public static String[] readCompressedStringArray(DataInput in) throws IOException { + int len = in.readInt(); + if (len == -1) + return null; + String[] s = new String[len]; + for (int i = 0; i < len; i++) { + s[i] = readCompressedString(in); + } + return s; + } - len = (len < -120) ? -(len + 120) : -(len + 112); + /* + * + * Test Utility Method Display Byte Array. + */ + public static void displayByteArray(byte[] record) { + int i; + for (i = 0; i < record.length - 1; i++) { + if (i % 16 == 0) { + System.out.println(); + } + System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F)); + System.out.print(Integer.toHexString(record[i] & 0x0F)); + System.out.print(","); + } + System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F)); + System.out.print(Integer.toHexString(record[i] & 0x0F)); + System.out.println(); + } - for (int idx = len; idx != 0; idx--) { - int shiftbits = (idx - 1) * 8; - long mask = 0xFFL << shiftbits; - stream.writeByte((byte)((i & mask) >> shiftbits)); + /** + * Serializes an integer to a binary stream with zero-compressed encoding. For -120 <= i <= 127, only one byte is used with the actual value. For other + * values of i, the first byte value indicates whether the integer is positive or negative, and the number of bytes that follow. If the first byte value v + * is between -121 and -124, the following integer is positive, with number of bytes that follow are -(v+120). If the first byte value v is between -125 and + * -128, the following integer is negative, with number of bytes that follow are -(v+124). Bytes are stored in the high-non-zero-byte-first order. + * + * @param stream Binary output stream + * @param i Integer to be serialized + * @throws IOException + */ + public static void writeVInt(DataOutput stream, int i) throws IOException { + writeVLong(stream, i); } - } - - - /** - * Reads a zero-compressed encoded long from input stream and returns it. - * @param stream Binary input stream - * @throws java.io.IOException - * @return deserialized long from stream. - */ - public static long readVLong(DataInput stream) throws IOException { - byte firstByte = stream.readByte(); - int len = decodeVIntSize(firstByte); - if (len == 1) { - return firstByte; + + /** + * Serializes a long to a binary stream with zero-compressed encoding. For -112 <= i <= 127, only one byte is used with the actual value. For other values + * of i, the first byte value indicates whether the long is positive or negative, and the number of bytes that follow. If the first byte value v is between + * -113 and -120, the following long is positive, with number of bytes that follow are -(v+112). If the first byte value v is between -121 and -128, the + * following long is negative, with number of bytes that follow are -(v+120). Bytes are stored in the high-non-zero-byte-first order. + * + * @param stream Binary output stream + * @param i Long to be serialized + * @throws IOException + */ + public static void writeVLong(DataOutput stream, long i) throws IOException { + if (i >= -112 && i <= 127) { + stream.writeByte((byte) i); + return; + } + + int len = -112; + if (i < 0) { + i ^= -1L; // take one's complement' + len = -120; + } + + long tmp = i; + while (tmp != 0) { + tmp = tmp >> 8; + len--; + } + + stream.writeByte((byte) len); + + len = (len < -120) ? -(len + 120) : -(len + 112); + + for (int idx = len; idx != 0; idx--) { + int shiftbits = (idx - 1) * 8; + long mask = 0xFFL << shiftbits; + stream.writeByte((byte) ((i & mask) >> shiftbits)); + } } - long i = 0; - for (int idx = 0; idx < len-1; idx++) { - byte b = stream.readByte(); - i = i << 8; - i = i | (b & 0xFF); + + /** + * Reads a zero-compressed encoded long from input stream and returns it. + * + * @param stream Binary input stream + * @throws IOException + * @return deserialized long from stream. + */ + public static long readVLong(DataInput stream) throws IOException { + byte firstByte = stream.readByte(); + int len = decodeVIntSize(firstByte); + if (len == 1) { + return firstByte; + } + long i = 0; + for (int idx = 0; idx < len - 1; idx++) { + byte b = stream.readByte(); + i = i << 8; + i = i | (b & 0xFF); + } + return (isNegativeVInt(firstByte) ? (i ^ -1L) : i); } - return (isNegativeVInt(firstByte) ? (i ^ -1L) : i); - } - - /** - * Reads a zero-compressed encoded integer from input stream and returns it. - * @param stream Binary input stream - * @throws java.io.IOException - * @return deserialized integer from stream. - */ - public static int readVInt(DataInput stream) throws IOException { - return (int) readVLong(stream); - } - - /** - * Given the first byte of a vint/vlong, determine the sign - * @param value the first byte - * @return is the value negative - */ - public static boolean isNegativeVInt(byte value) { - return value < -120 || (value >= -112 && value < 0); - } - - /** - * Parse the first byte of a vint/vlong to determine the number of bytes - * @param value the first byte of the vint/vlong - * @return the total number of bytes (1 to 9) - */ - public static int decodeVIntSize(byte value) { - if (value >= -112) { - return 1; - } else if (value < -120) { - return -119 - value; + + /** + * Reads a zero-compressed encoded integer from input stream and returns it. + * + * @param stream Binary input stream + * @throws IOException + * @return deserialized integer from stream. + */ + public static int readVInt(DataInput stream) throws IOException { + return (int) readVLong(stream); } - return -111 - value; - } - - /** - * Get the encoded length if an integer is stored in a variable-length format - * @return the encoded length - */ - public static int getVIntSize(long i) { - if (i >= -112 && i <= 127) { - return 1; + + /** + * Given the first byte of a vint/vlong, determine the sign + * + * @param value the first byte + * @return is the value negative + */ + public static boolean isNegativeVInt(byte value) { + return value < -120 || (value >= -112 && value < 0); } - if (i < 0) { - i ^= -1L; // take one's complement' + /** + * Parse the first byte of a vint/vlong to determine the number of bytes + * + * @param value the first byte of the vint/vlong + * @return the total number of bytes (1 to 9) + */ + public static int decodeVIntSize(byte value) { + if (value >= -112) { + return 1; + } else if (value < -120) { + return -119 - value; + } + return -111 - value; } - // find the number of bytes with non-leading zeros - int dataBits = Long.SIZE - Long.numberOfLeadingZeros(i); - // find the number of data bytes + length byte - return (dataBits + 7) / 8 + 1; - } - - /** - * Skip <i>len</i> number of bytes in input stream<i>in</i> - * @param in input stream - * @param len number of bytes to skip - * @throws IOException when skipped less number of bytes - */ - public static void skipFully(DataInput in, int len) throws IOException { - int total = 0; - int cur = 0; - - while ((total<len) && ((cur = in.skipBytes(len-total)) > 0)) { - total += cur; + + /** + * Get the encoded length if an integer is stored in a variable-length format + * + * @return the encoded length + */ + public static int getVIntSize(long i) { + if (i >= -112 && i <= 127) { + return 1; + } + + if (i < 0) { + i ^= -1L; // take one's complement' + } + // find the number of bytes with non-leading zeros + int dataBits = Long.SIZE - Long.numberOfLeadingZeros(i); + // find the number of data bytes + length byte + return (dataBits + 7) / 8 + 1; } - if (total<len) { - throw new IOException("Not able to skip " + len + " bytes, possibly " + - "due to end of input."); + /** + * Skip <i>len</i> number of bytes in input stream<i>in</i> + * + * @param in input stream + * @param len number of bytes to skip + * @throws IOException when skipped less number of bytes + */ + public static void skipFully(DataInput in, int len) throws IOException { + int total = 0; + int cur = 0; + + while ((total < len) && ((cur = in.skipBytes(len - total)) > 0)) { + total += cur; + } + + if (total < len) { + throw new IOException("Not able to skip " + len + " bytes, possibly " + "due to end of input."); + } } - } }
