http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/CRC32OutputStream.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/CRC32OutputStream.java b/jstorm-core/src/main/java/backtype/storm/utils/CRC32OutputStream.java index 7d5ce73..483f0df 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/CRC32OutputStream.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/CRC32OutputStream.java @@ -23,11 +23,11 @@ import java.util.zip.CRC32; public class CRC32OutputStream extends OutputStream { private CRC32 hasher; - + public CRC32OutputStream() { hasher = new CRC32(); } - + public long getValue() { return hasher.getValue(); } @@ -40,5 +40,5 @@ public class CRC32OutputStream extends OutputStream { @Override public void write(byte[] bytes, int start, int end) throws IOException { hasher.update(bytes, start, end); - } + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ClojureTimerTask.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ClojureTimerTask.java b/jstorm-core/src/main/java/backtype/storm/utils/ClojureTimerTask.java index ca9b010..677cf60 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/ClojureTimerTask.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/ClojureTimerTask.java @@ -22,14 +22,14 @@ import java.util.TimerTask; public class ClojureTimerTask extends TimerTask { IFn _afn; - + public ClojureTimerTask(IFn afn) { super(); _afn = afn; } - + @Override public void run() { _afn.run(); - } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/Container.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/Container.java b/jstorm-core/src/main/java/backtype/storm/utils/Container.java index d4edcdf..0927e7c 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/Container.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/Container.java @@ -20,5 +20,5 @@ package backtype.storm.utils; import java.io.Serializable; public class Container implements Serializable { - public Object object; + public Object object; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/DRPCClient.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/DRPCClient.java b/jstorm-core/src/main/java/backtype/storm/utils/DRPCClient.java index b2a2a7d..03ede66 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/DRPCClient.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/DRPCClient.java @@ -46,15 +46,15 @@ public class DRPCClient extends ThriftClient implements DistributedRPC.Iface { this.port = port; this.client = new DistributedRPC.Client(_protocol); } - + public String getHost() { return host; } - + public int getPort() { return port; } - + public String execute(String func, String args) throws TException, DRPCExecutionException, AuthorizationException { return client.execute(func, args); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueue.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueue.java b/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueue.java index 94768e6..330a5c6 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueue.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueue.java @@ -32,13 +32,13 @@ public abstract class DisruptorQueue implements IStatefulObject { public static void setUseSleep(boolean useSleep) { DisruptorQueueImpl.setUseSleep(useSleep); } - + private static boolean CAPACITY_LIMITED = false; - + public static void setLimited(boolean limited) { CAPACITY_LIMITED = limited; } - + public static DisruptorQueue mkInstance(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) { if (CAPACITY_LIMITED == true) { return new DisruptorQueueImpl(queueName, producerType, bufferSize, wait); @@ -46,35 +46,35 @@ public abstract class DisruptorQueue implements IStatefulObject { return new DisruptorWrapBlockingQueue(queueName, producerType, bufferSize, wait); } } - + public abstract String getName(); - + public abstract void haltWithInterrupt(); - + public abstract Object poll(); - + public abstract Object take(); - + public abstract void consumeBatch(EventHandler<Object> handler); - + public abstract void consumeBatchWhenAvailable(EventHandler<Object> handler); - + public abstract void publish(Object obj); - + public abstract void publish(Object obj, boolean block) throws InsufficientCapacityException; - + public abstract void consumerStarted(); - + public abstract void clear(); - + public abstract long population(); - + public abstract long capacity(); - + public abstract long writePos(); - + public abstract long readPos(); - + public abstract float pctFull(); - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java b/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java index 58d8313..2941cc9 100644 --- a/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java @@ -45,30 +45,30 @@ import com.lmax.disruptor.dsl.ProducerType; public class DisruptorQueueImpl extends DisruptorQueue { private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueueImpl.class); static boolean useSleep = true; - + public static void setUseSleep(boolean useSleep) { AbstractSequencerExt.setWaitSleep(useSleep); } - + private static final Object FLUSH_CACHE = new Object(); private static final Object INTERRUPT = new Object(); private static final String PREFIX = "disruptor-"; - + private final String _queueName; private final RingBuffer<MutableObject> _buffer; private final Sequence _consumer; private final SequenceBarrier _barrier; - + // TODO: consider having a threadlocal cache of this variable to speed up // reads? volatile boolean consumerStartedFlag = false; - + private final HashMap<String, Object> state = new HashMap<String, Object>(4); private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>(); private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(); private final Lock readLock = cacheLock.readLock(); private final Lock writeLock = cacheLock.writeLock(); - + public DisruptorQueueImpl(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) { this._queueName = PREFIX + queueName; _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait); @@ -89,19 +89,19 @@ public class DisruptorQueueImpl extends DisruptorQueue { } } } - + public String getName() { return _queueName; } - + public void consumeBatch(EventHandler<Object> handler) { consumeBatchToCursor(_barrier.getCursor(), handler); } - + public void haltWithInterrupt() { publish(INTERRUPT); } - + public Object poll() { // @@@ // should use _cache.isEmpty, but it is slow @@ -109,7 +109,7 @@ public class DisruptorQueueImpl extends DisruptorQueue { if (consumerStartedFlag == false) { return _cache.poll(); } - + final long nextSequence = _consumer.get() + 1; if (nextSequence <= _barrier.getCursor()) { MutableObject mo = _buffer.get(nextSequence); @@ -120,7 +120,7 @@ public class DisruptorQueueImpl extends DisruptorQueue { } return null; } - + public Object take() { // @@@ // should use _cache.isEmpty, but it is slow @@ -128,7 +128,7 @@ public class DisruptorQueueImpl extends DisruptorQueue { if (consumerStartedFlag == false) { return _cache.poll(); } - + final long nextSequence = _consumer.get() + 1; // final long availableSequence; try { @@ -141,7 +141,7 @@ public class DisruptorQueueImpl extends DisruptorQueue { // throw new RuntimeException(e); return null; } catch (TimeoutException e) { - //LOG.error(e.getCause(), e); + // LOG.error(e.getCause(), e); return null; } MutableObject mo = _buffer.get(nextSequence); @@ -150,7 +150,7 @@ public class DisruptorQueueImpl extends DisruptorQueue { mo.setObject(null); return ret; } - + public void consumeBatchWhenAvailable(EventHandler<Object> handler) { try { final long nextSequence = _consumer.get() + 1; @@ -165,11 +165,11 @@ public class DisruptorQueueImpl extends DisruptorQueue { LOG.error("InterruptedException " + e.getCause()); return; } catch (TimeoutException e) { - //LOG.error(e.getCause(), e); + // LOG.error(e.getCause(), e); return; } } - + public void consumeBatchToCursor(long cursor, EventHandler<Object> handler) { for (long curr = _consumer.get() + 1; curr <= cursor; curr++) { try { @@ -202,7 +202,7 @@ public class DisruptorQueueImpl extends DisruptorQueue { // TODO: only set this if the consumer cursor has changed? _consumer.set(cursor); } - + /* * Caches until consumerStarted is called, upon which the cache is flushed to the consumer */ @@ -213,15 +213,15 @@ public class DisruptorQueueImpl extends DisruptorQueue { throw new RuntimeException("This code should be unreachable!"); } } - + public void tryPublish(Object obj) throws InsufficientCapacityException { publish(obj, false); } - + public void publish(Object obj, boolean block) throws InsufficientCapacityException { - + boolean publishNow = consumerStartedFlag; - + if (!publishNow) { readLock.lock(); try { @@ -233,12 +233,12 @@ public class DisruptorQueueImpl extends DisruptorQueue { readLock.unlock(); } } - + if (publishNow) { publishDirect(obj, block); } } - + protected void publishDirect(Object obj, boolean block) throws InsufficientCapacityException { final long id; if (block) { @@ -250,41 +250,41 @@ public class DisruptorQueueImpl extends DisruptorQueue { m.setObject(obj); _buffer.publish(id); } - + public void consumerStarted() { - + writeLock.lock(); consumerStartedFlag = true; - + writeLock.unlock(); } - + public void clear() { while (population() != 0L) { poll(); } } - + public long population() { return (writePos() - readPos()); } - + public long capacity() { return _buffer.getBufferSize(); } - + public long writePos() { return _buffer.getCursor(); } - + public long readPos() { return _consumer.get(); } - + public float pctFull() { return (1.0F * population() / capacity()); } - + @Override public Object getState() { // get readPos then writePos so it's never an under-estimate @@ -296,7 +296,7 @@ public class DisruptorQueueImpl extends DisruptorQueue { state.put("read_pos", rp); return state; } - + public static class ObjectEventFactory implements EventFactory<MutableObject> { @Override public MutableObject newInstance() { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java b/jstorm-core/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java index af5618b..5831a97 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java @@ -36,33 +36,33 @@ import com.lmax.disruptor.dsl.ProducerType; */ public class DisruptorWrapBlockingQueue extends DisruptorQueue { private static final Logger LOG = LoggerFactory.getLogger(DisruptorWrapBlockingQueue.class); - + private static final long QUEUE_CAPACITY = 512; private LinkedBlockingDeque<Object> queue; - + private String queueName; - + public DisruptorWrapBlockingQueue(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) { this.queueName = queueName; queue = new LinkedBlockingDeque<Object>(); } - + public String getName() { return queueName; } - + // poll method public void consumeBatch(EventHandler<Object> handler) { consumeBatchToCursor(0, handler); } - + public void haltWithInterrupt() { } - + public Object poll() { return queue.poll(); } - + public Object take() { try { return queue.take(); @@ -70,7 +70,7 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue { return null; } } - + public void drainQueue(Object object, EventHandler<Object> handler) { while (object != null) { try { @@ -84,7 +84,7 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue { } } } - + public void consumeBatchWhenAvailable(EventHandler<Object> handler) { Object object = queue.poll(); if (object == null) { @@ -96,16 +96,16 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue { throw new RuntimeException(e); } } - + drainQueue(object, handler); - + } - + public void consumeBatchToCursor(long cursor, EventHandler<Object> handler) { Object object = queue.poll(); drainQueue(object, handler); } - + /* * Caches until consumerStarted is called, upon which the cache is flushed to the consumer */ @@ -118,17 +118,17 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue { } isSuccess = queue.offer(obj); } - + } - + public void tryPublish(Object obj) throws InsufficientCapacityException { boolean isSuccess = queue.offer(obj); if (isSuccess == false) { throw InsufficientCapacityException.INSTANCE; } - + } - + public void publish(Object obj, boolean block) throws InsufficientCapacityException { if (block == true) { publish(obj); @@ -136,21 +136,21 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue { tryPublish(obj); } } - + public void consumerStarted() { } - + private void flushCache() { } - + public void clear() { queue.clear(); } - + public long population() { return queue.size(); } - + public long capacity() { long used = queue.size(); if (used < QUEUE_CAPACITY) { @@ -159,15 +159,15 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue { return used; } } - + public long writePos() { return 0; } - + public long readPos() { return queue.size(); } - + public float pctFull() { long used = queue.size(); if (used < QUEUE_CAPACITY) { @@ -176,7 +176,7 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue { return 1.0f; } } - + @Override public Object getState() { Map state = new HashMap<String, Object>(); @@ -189,12 +189,12 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue { state.put("read_pos", rp); return state; } - + public static class ObjectEventFactory implements EventFactory<MutableObject> { @Override public MutableObject newInstance() { return new MutableObject(); } } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ExtendedThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ExtendedThreadPoolExecutor.java b/jstorm-core/src/main/java/backtype/storm/utils/ExtendedThreadPoolExecutor.java index 4614366..e68898e 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/ExtendedThreadPoolExecutor.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/ExtendedThreadPoolExecutor.java @@ -27,41 +27,44 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -public class ExtendedThreadPoolExecutor extends ThreadPoolExecutor{ +public class ExtendedThreadPoolExecutor extends ThreadPoolExecutor { - public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); - } - - public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); - } + public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } - public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); - } + public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, + ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } - public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); - } + public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, + RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); + } - @Override - protected void afterExecute(Runnable r, Throwable t) { - super.afterExecute(r, t); - if (t == null && r instanceof Future<?>) { - try { - Object result = ((Future<?>) r).get(); - } catch (CancellationException ce) { - t = ce; - } catch (ExecutionException ee) { - t = ee.getCause(); - } catch (InterruptedException ie) { - // If future got interrupted exception, we want to interrupt parent thread itself. - Thread.currentThread().interrupt(); - } + public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, + ThreadFactory threadFactory, RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } - if (t != null) { - Utils.handleUncaughtException(t); + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + if (t == null && r instanceof Future<?>) { + try { + Object result = ((Future<?>) r).get(); + } catch (CancellationException ce) { + t = ce; + } catch (ExecutionException ee) { + t = ee.getCause(); + } catch (InterruptedException ie) { + // If future got interrupted exception, we want to interrupt parent thread itself. + Thread.currentThread().interrupt(); + } + } + if (t != null) { + Utils.handleUncaughtException(t); + } } - } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/IndifferentAccessMap.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/IndifferentAccessMap.java b/jstorm-core/src/main/java/backtype/storm/utils/IndifferentAccessMap.java index c0190cc..2bc6e7d 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/IndifferentAccessMap.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/IndifferentAccessMap.java @@ -17,7 +17,6 @@ */ package backtype.storm.utils; - import clojure.lang.ILookup; import clojure.lang.ISeq; import clojure.lang.AFn; @@ -65,16 +64,17 @@ public class IndifferentAccessMap extends AFn implements ILookup, IPersistentMap @Override public Object valAt(Object o) { - if(o instanceof Keyword) { + if (o instanceof Keyword) { return valAt(((Keyword) o).getName()); } return getMap().valAt(o); } - + @Override public Object valAt(Object o, Object def) { Object ret = valAt(o); - if(ret==null) ret = def; + if (ret == null) + ret = def; return ret; } @@ -92,30 +92,35 @@ public class IndifferentAccessMap extends AFn implements ILookup, IPersistentMap /* IPersistentMap */ /* Naive implementation, but it might be good enough */ public IPersistentMap assoc(Object k, Object v) { - if(k instanceof Keyword) return assoc(((Keyword) k).getName(), v); - + if (k instanceof Keyword) + return assoc(((Keyword) k).getName(), v); + return new IndifferentAccessMap(getMap().assoc(k, v)); } public IPersistentMap assocEx(Object k, Object v) { - if(k instanceof Keyword) return assocEx(((Keyword) k).getName(), v); + if (k instanceof Keyword) + return assocEx(((Keyword) k).getName(), v); return new IndifferentAccessMap(getMap().assocEx(k, v)); } public IPersistentMap without(Object k) { - if(k instanceof Keyword) return without(((Keyword) k).getName()); + if (k instanceof Keyword) + return without(((Keyword) k).getName()); return new IndifferentAccessMap(getMap().without(k)); } public boolean containsKey(Object k) { - if(k instanceof Keyword) return containsKey(((Keyword) k).getName()); + if (k instanceof Keyword) + return containsKey(((Keyword) k).getName()); return getMap().containsKey(k); } public IMapEntry entryAt(Object k) { - if(k instanceof Keyword) return entryAt(((Keyword) k).getName()); + if (k instanceof Keyword) + return entryAt(((Keyword) k).getName()); return getMap().entryAt(k); } @@ -160,17 +165,20 @@ public class IndifferentAccessMap extends AFn implements ILookup, IPersistentMap public Collection values() { return ((Map) getMap()).values(); } - + /* Not implemented */ public void clear() { throw new UnsupportedOperationException(); } + public Object put(Object k, Object v) { throw new UnsupportedOperationException(); } + public void putAll(Map m) { throw new UnsupportedOperationException(); } + public Object remove(Object k) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/InprocMessaging.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/InprocMessaging.java b/jstorm-core/src/main/java/backtype/storm/utils/InprocMessaging.java index b20c775..03c8e4b 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/InprocMessaging.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/InprocMessaging.java @@ -25,32 +25,32 @@ public class InprocMessaging { private static Map<Integer, LinkedBlockingQueue<Object>> _queues = new HashMap<Integer, LinkedBlockingQueue<Object>>(); private static final Object _lock = new Object(); private static int port = 1; - + public static int acquireNewPort() { int ret; - synchronized(_lock) { + synchronized (_lock) { ret = port; port++; } return ret; } - + public static void sendMessage(int port, Object msg) { getQueue(port).add(msg); } - + public static Object takeMessage(int port) throws InterruptedException { return getQueue(port).take(); } public static Object pollMessage(int port) { - return getQueue(port).poll(); - } - + return getQueue(port).poll(); + } + private static LinkedBlockingQueue<Object> getQueue(int port) { - synchronized(_lock) { - if(!_queues.containsKey(port)) { - _queues.put(port, new LinkedBlockingQueue<Object>()); + synchronized (_lock) { + if (!_queues.containsKey(port)) { + _queues.put(port, new LinkedBlockingQueue<Object>()); } return _queues.get(port); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/KeyedRoundRobinQueue.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/KeyedRoundRobinQueue.java b/jstorm-core/src/main/java/backtype/storm/utils/KeyedRoundRobinQueue.java index 3cb455d..661c045 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/KeyedRoundRobinQueue.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/KeyedRoundRobinQueue.java @@ -33,9 +33,9 @@ public class KeyedRoundRobinQueue<V> { private int _currIndex = 0; public void add(Object key, V val) { - synchronized(_lock) { + synchronized (_lock) { Queue<V> queue = _queues.get(key); - if(queue==null) { + if (queue == null) { queue = new LinkedList<V>(); _queues.put(key, queue); _keyOrder.add(key); @@ -47,14 +47,14 @@ public class KeyedRoundRobinQueue<V> { public V take() throws InterruptedException { _size.acquire(); - synchronized(_lock) { + synchronized (_lock) { Object key = _keyOrder.get(_currIndex); Queue<V> queue = _queues.get(key); V ret = queue.remove(); - if(queue.isEmpty()) { + if (queue.isEmpty()) { _keyOrder.remove(_currIndex); _queues.remove(key); - if(_keyOrder.size()==0) { + if (_keyOrder.size() == 0) { _currIndex = 0; } else { _currIndex = _currIndex % _keyOrder.size(); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ListDelegate.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ListDelegate.java b/jstorm-core/src/main/java/backtype/storm/utils/ListDelegate.java index 1e091f0..25e6878 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/ListDelegate.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/ListDelegate.java @@ -25,11 +25,11 @@ import java.util.ListIterator; public class ListDelegate implements List<Object> { private List<Object> _delegate; - + public ListDelegate() { - _delegate = new ArrayList<Object>(); + _delegate = new ArrayList<Object>(); } - + public void setDelegate(List<Object> delegate) { _delegate = delegate; } @@ -37,7 +37,7 @@ public class ListDelegate implements List<Object> { public List<Object> getDelegate() { return _delegate; } - + @Override public int size() { return _delegate.size(); @@ -152,5 +152,5 @@ public class ListDelegate implements List<Object> { public List<Object> subList(int i, int i1) { return _delegate.subList(i, i1); } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/LocalState.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/LocalState.java b/jstorm-core/src/main/java/backtype/storm/utils/LocalState.java index 0d8292f..843efb4 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/LocalState.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/LocalState.java @@ -25,12 +25,9 @@ import java.util.HashMap; import java.io.IOException; /** - * A simple, durable, atomic K/V database. *Very inefficient*, should only be - * used for occasional reads/writes. Every read/write hits disk. + * A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes. Every read/write hits disk. * - * @@@ - * Right now, This class hasn't upgrade to storm's LocalState - * It is need define every type in thrift, it is too complicated to do + * @@@ Right now, This class hasn't upgrade to storm's LocalState It is need define every type in thrift, it is too complicated to do */ public class LocalState { private VersionedStore _vs; @@ -46,8 +43,7 @@ public class LocalState { if (latestPath == null) return new HashMap<Object, Object>(); try { - return (Map<Object, Object>) Utils.javaDeserialize(FileUtils - .readFileToByteArray(new File(latestPath))); + return (Map<Object, Object>) Utils.javaDeserialize(FileUtils.readFileToByteArray(new File(latestPath))); } catch (IOException e) { attempts++; if (attempts >= 10) { @@ -65,8 +61,7 @@ public class LocalState { put(key, val, true); } - public synchronized void put(Object key, Object val, boolean cleanup) - throws IOException { + public synchronized void put(Object key, Object val, boolean cleanup) throws IOException { Map<Object, Object> curr = snapshot(); curr.put(key, val); persist(curr, cleanup); @@ -76,8 +71,7 @@ public class LocalState { remove(key, true); } - public synchronized void remove(Object key, boolean cleanup) - throws IOException { + public synchronized void remove(Object key, boolean cleanup) throws IOException { Map<Object, Object> curr = snapshot(); curr.remove(key); persist(curr, cleanup); @@ -87,8 +81,7 @@ public class LocalState { _vs.cleanup(keepVersions); } - private void persist(Map<Object, Object> val, boolean cleanup) - throws IOException { + private void persist(Map<Object, Object> val, boolean cleanup) throws IOException { byte[] toWrite = Utils.serialize(val); String newPath = _vs.createVersion(); FileUtils.writeByteArrayToFile(new File(newPath), toWrite); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/Monitor.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/Monitor.java b/jstorm-core/src/main/java/backtype/storm/utils/Monitor.java index eb57e99..b725084 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/Monitor.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/Monitor.java @@ -17,16 +17,17 @@ */ package backtype.storm.utils; -import backtype.storm.generated.*; +import backtype.storm.generated.ClusterSummary; +import backtype.storm.generated.Nimbus; +import backtype.storm.generated.TopologySummary; import java.util.HashSet; -import java.util.List; -import java.util.Map; /** * Deprecated in JStorm + * * @author zhongyan.feng - * + * */ @Deprecated public class Monitor { @@ -106,17 +107,17 @@ public class Monitor { /** * @@@ Don't be compatible with Storm * - * Here skip the logic + * Here skip the logic * @param client * @param topology * @return * @throws Exception */ - private HashSet<String> getComponents(Nimbus.Client client, String topology) throws Exception{ + private HashSet<String> getComponents(Nimbus.Client client, String topology) throws Exception { HashSet<String> components = new HashSet<String>(); ClusterSummary clusterSummary = client.getClusterInfo(); TopologySummary topologySummary = null; - for (TopologySummary ts: clusterSummary.get_topologies()) { + for (TopologySummary ts : clusterSummary.get_topologies()) { if (topology.equals(ts.get_name())) { topologySummary = ts; break; @@ -126,12 +127,12 @@ public class Monitor { throw new IllegalArgumentException("topology: " + topology + " not found"); } else { String id = topologySummary.get_id(); -// GetInfoOptions getInfoOpts = new GetInfoOptions(); -// getInfoOpts.set_num_err_choice(NumErrorsChoice.NONE); -// TopologyInfo info = client.getTopologyInfoWithOpts(id, getInfoOpts); -// for (ExecutorSummary es: info.get_executors()) { -// components.add(es.get_component_id()); -// } + // GetInfoOptions getInfoOpts = new GetInfoOptions(); + // getInfoOpts.set_num_err_choice(NumErrorsChoice.NONE); + // TopologyInfo info = client.getTopologyInfoWithOpts(id, getInfoOpts); + // for (ExecutorSummary es: info.get_executors()) { + // components.add(es.get_component_id()); + // } } return components; } @@ -161,7 +162,7 @@ public class Monitor { throw new IllegalArgumentException("stream name must be something"); } - if ( !WATCH_TRANSFERRED.equals(_watch) && !WATCH_EMITTED.equals(_watch)) { + if (!WATCH_TRANSFERRED.equals(_watch) && !WATCH_EMITTED.equals(_watch)) { throw new IllegalArgumentException("watch item must either be transferred or emitted"); } System.out.println("topology\tcomponent\tparallelism\tstream\ttime-diff ms\t" + _watch + "\tthroughput (Kt/s)"); @@ -189,7 +190,7 @@ public class Monitor { boolean streamFound = false; ClusterSummary clusterSummary = client.getClusterInfo(); TopologySummary topologySummary = null; - for (TopologySummary ts: clusterSummary.get_topologies()) { + for (TopologySummary ts : clusterSummary.get_topologies()) { if (_topology.equals(ts.get_name())) { topologySummary = ts; break; @@ -198,30 +199,30 @@ public class Monitor { if (topologySummary == null) { throw new IllegalArgumentException("topology: " + _topology + " not found"); } else { -// String id = topologySummary.get_id(); -// GetInfoOptions getInfoOpts = new GetInfoOptions(); -// getInfoOpts.set_num_err_choice(NumErrorsChoice.NONE); -// TopologyInfo info = client.getTopologyInfoWithOpts(id, getInfoOpts); -// for (ExecutorSummary es: info.get_executors()) { -// if (_component.equals(es.get_component_id())) { -// componentParallelism ++; -// ExecutorStats stats = es.get_stats(); -// if (stats != null) { -// Map<String,Map<String,Long>> statted = -// WATCH_EMITTED.equals(_watch) ? stats.get_emitted() : stats.get_transferred(); -// if ( statted != null) { -// Map<String, Long> e2 = statted.get(":all-time"); -// if (e2 != null) { -// Long stream = e2.get(_stream); -// if (stream != null){ -// streamFound = true; -// totalStatted += stream; -// } -// } -// } -// } -// } -// } + // String id = topologySummary.get_id(); + // GetInfoOptions getInfoOpts = new GetInfoOptions(); + // getInfoOpts.set_num_err_choice(NumErrorsChoice.NONE); + // TopologyInfo info = client.getTopologyInfoWithOpts(id, getInfoOpts); + // for (ExecutorSummary es: info.get_executors()) { + // if (_component.equals(es.get_component_id())) { + // componentParallelism ++; + // ExecutorStats stats = es.get_stats(); + // if (stats != null) { + // Map<String,Map<String,Long>> statted = + // WATCH_EMITTED.equals(_watch) ? stats.get_emitted() : stats.get_transferred(); + // if ( statted != null) { + // Map<String, Long> e2 = statted.get(":all-time"); + // if (e2 != null) { + // Long stream = e2.get(_stream); + // if (stream != null){ + // streamFound = true; + // totalStatted += stream; + // } + // } + // } + // } + // } + // } } if (componentParallelism <= 0) { @@ -242,8 +243,9 @@ public class Monitor { long stattedDelta = totalStatted - state.getLastStatted(); state.setLastTime(now); state.setLastStatted(totalStatted); - double throughput = (stattedDelta == 0 || timeDelta == 0) ? 0.0 : ((double)stattedDelta/(double)timeDelta); - System.out.println(_topology+"\t"+_component+"\t"+componentParallelism+"\t"+_stream+"\t"+timeDelta+"\t"+stattedDelta+"\t"+throughput); + double throughput = (stattedDelta == 0 || timeDelta == 0) ? 0.0 : ((double) stattedDelta / (double) timeDelta); + System.out.println(_topology + "\t" + _component + "\t" + componentParallelism + "\t" + _stream + "\t" + timeDelta + "\t" + stattedDelta + "\t" + + throughput); } public void set_interval(int _interval) { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/MutableInt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/MutableInt.java b/jstorm-core/src/main/java/backtype/storm/utils/MutableInt.java index 326ade0..aca3a24 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/MutableInt.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/MutableInt.java @@ -23,21 +23,21 @@ public class MutableInt { public MutableInt(int val) { this.val = val; } - + public void set(int val) { this.val = val; } - + public int get() { return val; } - + public int increment() { return increment(1); } - + public int increment(int amt) { - val+=amt; + val += amt; return val; } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/MutableLong.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/MutableLong.java b/jstorm-core/src/main/java/backtype/storm/utils/MutableLong.java index a744c1c..2f4034e 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/MutableLong.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/MutableLong.java @@ -23,21 +23,21 @@ public class MutableLong { public MutableLong(long val) { this.val = val; } - + public void set(long val) { this.val = val; } - + public long get() { return val; } - + public long increment() { return increment(1); } - + public long increment(long amt) { - val+=amt; + val += amt; return val; } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/MutableObject.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/MutableObject.java b/jstorm-core/src/main/java/backtype/storm/utils/MutableObject.java index d5cb7db..d0f928c 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/MutableObject.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/MutableObject.java @@ -19,19 +19,19 @@ package backtype.storm.utils; public class MutableObject { Object o = null; - + public MutableObject() { - + } public MutableObject(Object o) { this.o = o; } - + public void setObject(Object o) { this.o = o; } - + public Object getObject() { return o; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/NimbusClient.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/NimbusClient.java b/jstorm-core/src/main/java/backtype/storm/utils/NimbusClient.java index 5829b67..ac76439 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/NimbusClient.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/NimbusClient.java @@ -31,24 +31,24 @@ import backtype.storm.security.auth.ThriftConnectionType; public class NimbusClient extends ThriftClient { private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class); - + private Nimbus.Client _client; private static String clientVersion = Utils.getVersion(); - + @SuppressWarnings("unchecked") public static NimbusClient getConfiguredClient(Map conf) { return getConfiguredClient(conf, null); } - + @SuppressWarnings("unchecked") public static NimbusClient getConfiguredClient(Map conf, Integer timeout) { return getConfiguredClientAs(conf, timeout, null); } - + public static NimbusClient getConfiguredClientAs(Map conf, String asUser) { return getConfiguredClientAs(conf, null, asUser); } - + public static void checkVersion(NimbusClient client) { String serverVersion; try { @@ -56,24 +56,24 @@ public class NimbusClient extends ThriftClient { } catch (TException e) { // TODO Auto-generated catch block LOG.warn("Failed to get nimbus version "); - return ; + return; } if (!clientVersion.equals(serverVersion)) { LOG.warn("Your client version: " + clientVersion + " but nimbus version: " + serverVersion); } } - + public static NimbusClient getConfiguredClientAs(Map conf, Integer timeout, String asUser) { try { - if(conf.containsKey(Config.STORM_DO_AS_USER)) { - if(asUser != null && !asUser.isEmpty()) { - LOG.warn("You have specified a doAsUser as param {} and a doAsParam as config, config will take precedence." - , asUser, conf.get(Config.STORM_DO_AS_USER)); + if (conf.containsKey(Config.STORM_DO_AS_USER)) { + if (asUser != null && !asUser.isEmpty()) { + LOG.warn("You have specified a doAsUser as param {} and a doAsParam as config, config will take precedence.", asUser, + conf.get(Config.STORM_DO_AS_USER)); } asUser = (String) conf.get(Config.STORM_DO_AS_USER); } - - NimbusClient client = new NimbusClient(conf, null, null, timeout, asUser); + + NimbusClient client = new NimbusClient(conf, null, null, timeout, asUser); checkVersion(client); return client; } catch (Exception ex) { @@ -84,24 +84,24 @@ public class NimbusClient extends ThriftClient { public NimbusClient(Map conf, String host, int port) throws TTransportException { this(conf, host, port, null); } - + public NimbusClient(Map conf, String host, int port, Integer timeout) throws TTransportException { super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, null); _client = new Nimbus.Client(_protocol); } - + public NimbusClient(Map conf, String host, Integer port, Integer timeout, String asUser) throws TTransportException { super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, asUser); _client = new Nimbus.Client(_protocol); } - + public NimbusClient(Map conf, String host) throws TTransportException { super(conf, ThriftConnectionType.NIMBUS, host, null, null, null); _client = new Nimbus.Client(_protocol); } - + public Nimbus.Client getClient() { return _client; } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/RegisteredGlobalState.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/RegisteredGlobalState.java b/jstorm-core/src/main/java/backtype/storm/utils/RegisteredGlobalState.java index 48053fc..fbaf03b 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/RegisteredGlobalState.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/RegisteredGlobalState.java @@ -21,44 +21,42 @@ import java.util.HashMap; import java.util.UUID; /** - * This class is used as part of testing Storm. It is used to keep track of "global metrics" - * in an atomic way. For example, it is used for doing fine-grained detection of when a - * local Storm cluster is idle by tracking the number of transferred tuples vs the number of processed - * tuples. + * This class is used as part of testing Storm. It is used to keep track of "global metrics" in an atomic way. For example, it is used for doing fine-grained + * detection of when a local Storm cluster is idle by tracking the number of transferred tuples vs the number of processed tuples. */ public class RegisteredGlobalState { private static HashMap<String, Object> _states = new HashMap<String, Object>(); private static final Object _lock = new Object(); - + public static Object globalLock() { return _lock; } - + public static String registerState(Object init) { - synchronized(_lock) { + synchronized (_lock) { String id = UUID.randomUUID().toString(); _states.put(id, init); return id; } } - + public static void setState(String id, Object init) { - synchronized(_lock) { + synchronized (_lock) { _states.put(id, init); } } - + public static Object getState(String id) { - synchronized(_lock) { + synchronized (_lock) { Object ret = _states.get(id); - //System.out.println("State: " + ret.toString()); + // System.out.println("State: " + ret.toString()); return ret; - } + } } - + public static void clearState(String id) { - synchronized(_lock) { + synchronized (_lock) { _states.remove(id); - } + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/RotatingMap.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/RotatingMap.java b/jstorm-core/src/main/java/backtype/storm/utils/RotatingMap.java index 2ed0e33..db62e5c 100644 --- a/jstorm-core/src/main/java/backtype/storm/utils/RotatingMap.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/RotatingMap.java @@ -24,18 +24,17 @@ import java.util.Map; import java.util.Map.Entry; /** - * Expires keys that have not been updated in the configured number of seconds. - * The algorithm used will take between expirationSecs and - * expirationSecs * (1 + 1 / (numBuckets-1)) to actually expire the message. - * + * Expires keys that have not been updated in the configured number of seconds. The algorithm used will take between expirationSecs and expirationSecs * (1 + 1 + * / (numBuckets-1)) to actually expire the message. + * * get, put, remove, containsKey, and size take O(numBuckets) time to run. - * - * The advantage of this design is that the expiration thread only locks the object - * for O(1) time, meaning the object is essentially always available for gets/puts. + * + * The advantage of this design is that the expiration thread only locks the object for O(1) time, meaning the object is essentially always available for + * gets/puts. */ @Deprecated public class RotatingMap<K, V> { - //this default ensures things expire at most 50% past the expiration time + // this default ensures things expire at most 50% past the expiration time private static final int DEFAULT_NUM_BUCKETS = 3; public static interface ExpiredCallback<K, V> { @@ -45,13 +44,13 @@ public class RotatingMap<K, V> { private LinkedList<HashMap<K, V>> _buckets; private ExpiredCallback _callback; - + public RotatingMap(int numBuckets, ExpiredCallback<K, V> callback) { - if(numBuckets<2) { + if (numBuckets < 2) { throw new IllegalArgumentException("numBuckets must be >= 2"); } _buckets = new LinkedList<HashMap<K, V>>(); - for(int i=0; i<numBuckets; i++) { + for (int i = 0; i < numBuckets; i++) { _buckets.add(new HashMap<K, V>()); } @@ -64,13 +63,13 @@ public class RotatingMap<K, V> { public RotatingMap(int numBuckets) { this(numBuckets, null); - } - + } + public Map<K, V> rotate() { Map<K, V> dead = _buckets.removeLast(); _buckets.addFirst(new HashMap<K, V>()); - if(_callback!=null) { - for(Entry<K, V> entry: dead.entrySet()) { + if (_callback != null) { + for (Entry<K, V> entry : dead.entrySet()) { _callback.expire(entry.getKey(), entry.getValue()); } } @@ -78,8 +77,8 @@ public class RotatingMap<K, V> { } public boolean containsKey(K key) { - for(HashMap<K, V> bucket: _buckets) { - if(bucket.containsKey(key)) { + for (HashMap<K, V> bucket : _buckets) { + if (bucket.containsKey(key)) { return true; } } @@ -87,8 +86,8 @@ public class RotatingMap<K, V> { } public V get(K key) { - for(HashMap<K, V> bucket: _buckets) { - if(bucket.containsKey(key)) { + for (HashMap<K, V> bucket : _buckets) { + if (bucket.containsKey(key)) { return bucket.get(key); } } @@ -99,16 +98,15 @@ public class RotatingMap<K, V> { Iterator<HashMap<K, V>> it = _buckets.iterator(); HashMap<K, V> bucket = it.next(); bucket.put(key, value); - while(it.hasNext()) { + while (it.hasNext()) { bucket = it.next(); bucket.remove(key); } } - - + public Object remove(K key) { - for(HashMap<K, V> bucket: _buckets) { - if(bucket.containsKey(key)) { + for (HashMap<K, V> bucket : _buckets) { + if (bucket.containsKey(key)) { return bucket.remove(key); } } @@ -117,9 +115,9 @@ public class RotatingMap<K, V> { public int size() { int size = 0; - for(HashMap<K, V> bucket: _buckets) { - size+=bucket.size(); + for (HashMap<K, V> bucket : _buckets) { + size += bucket.size(); } return size; - } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ServiceRegistry.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ServiceRegistry.java b/jstorm-core/src/main/java/backtype/storm/utils/ServiceRegistry.java index 724bc3e..92dc2f7 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/ServiceRegistry.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/ServiceRegistry.java @@ -24,24 +24,24 @@ import java.util.UUID; public class ServiceRegistry { private static HashMap<String, Object> _services = new HashMap<String, Object>(); private static final Object _lock = new Object(); - + public static String registerService(Object service) { - synchronized(_lock) { + synchronized (_lock) { String id = UUID.randomUUID().toString(); _services.put(id, service); return id; } } - + public static Object getService(String id) { - synchronized(_lock) { + synchronized (_lock) { return _services.get(id); - } + } } - + public static void unregisterService(String id) { - synchronized(_lock) { + synchronized (_lock) { _services.remove(id); - } + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ShellProcess.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ShellProcess.java b/jstorm-core/src/main/java/backtype/storm/utils/ShellProcess.java index 78f47d6..69af852 100644 --- a/jstorm-core/src/main/java/backtype/storm/utils/ShellProcess.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/ShellProcess.java @@ -37,10 +37,10 @@ import org.slf4j.LoggerFactory; public class ShellProcess implements Serializable { public static Logger LOG = LoggerFactory.getLogger(ShellProcess.class); public static Logger ShellLogger; - private Process _subprocess; - private InputStream processErrorStream; - private String[] command; - public ISerializer serializer; + private Process _subprocess; + private InputStream processErrorStream; + private String[] command; + public ISerializer serializer; public Number pid; public String componentName; @@ -63,9 +63,7 @@ public class ShellProcess implements Serializable { serializer.initialize(_subprocess.getOutputStream(), _subprocess.getInputStream()); this.pid = serializer.connect(conf, context); } catch (IOException e) { - throw new RuntimeException( - "Error when launching multilang subprocess\n" - + getErrorsString(), e); + throw new RuntimeException("Error when launching multilang subprocess\n" + getErrorsString(), e); } catch (NoOutputException e) { throw new RuntimeException(e + getErrorsString() + "\n"); } @@ -73,18 +71,18 @@ public class ShellProcess implements Serializable { } private ISerializer getSerializer(Map conf) { - //get factory class name - String serializer_className = (String)conf.get(Config.TOPOLOGY_MULTILANG_SERIALIZER); + // get factory class name + String serializer_className = (String) conf.get(Config.TOPOLOGY_MULTILANG_SERIALIZER); LOG.info("Storm multilang serializer: " + serializer_className); ISerializer serializer = null; try { - //create a factory class + // create a factory class Class klass = Class.forName(serializer_className); - //obtain a serializer object + // obtain a serializer object Object obj = klass.newInstance(); - serializer = (ISerializer)obj; - } catch(Exception e) { + serializer = (ISerializer) obj; + } catch (Exception e) { throw new RuntimeException("Failed to construct multilang serializer from serializer " + serializer_className, e); } return serializer; @@ -152,7 +150,7 @@ public class ShellProcess implements Serializable { } /** - * + * * @return pid, if the process has been launched, null otherwise. */ public Number getPid() { @@ -160,7 +158,7 @@ public class ShellProcess implements Serializable { } /** - * + * * @return the name of component. */ public String getComponentName() { @@ -168,13 +166,13 @@ public class ShellProcess implements Serializable { } /** - * + * * @return exit code of the process if process is terminated, -1 if process is not started or terminated. */ public int getExitCode() { try { return this._subprocess != null ? this._subprocess.exitValue() : -1; - } catch(IllegalThreadStateException e) { + } catch (IllegalThreadStateException e) { return -1; } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ShellUtils.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ShellUtils.java b/jstorm-core/src/main/java/backtype/storm/utils/ShellUtils.java index 1065ff9..261cbb7 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/ShellUtils.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/ShellUtils.java @@ -31,19 +31,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - - abstract public class ShellUtils { public static Logger LOG = LoggerFactory.getLogger(ShellUtils.class); // OSType detection public enum OSType { - OS_TYPE_LINUX, - OS_TYPE_WIN, - OS_TYPE_SOLARIS, - OS_TYPE_MAC, - OS_TYPE_FREEBSD, - OS_TYPE_OTHER + OS_TYPE_LINUX, OS_TYPE_WIN, OS_TYPE_SOLARIS, OS_TYPE_MAC, OS_TYPE_FREEBSD, OS_TYPE_OTHER } public static final OSType osType = getOSType(); @@ -69,29 +62,27 @@ abstract public class ShellUtils { // Helper static vars for each platform public static final boolean WINDOWS = (osType == OSType.OS_TYPE_WIN); public static final boolean SOLARIS = (osType == OSType.OS_TYPE_SOLARIS); - public static final boolean MAC = (osType == OSType.OS_TYPE_MAC); + public static final boolean MAC = (osType == OSType.OS_TYPE_MAC); public static final boolean FREEBSD = (osType == OSType.OS_TYPE_FREEBSD); - public static final boolean LINUX = (osType == OSType.OS_TYPE_LINUX); - public static final boolean OTHER = (osType == OSType.OS_TYPE_OTHER); - + public static final boolean LINUX = (osType == OSType.OS_TYPE_LINUX); + public static final boolean OTHER = (osType == OSType.OS_TYPE_OTHER); /** Token separator regex used to parse Shell tool outputs */ - public static final String TOKEN_SEPARATOR_REGEX - = WINDOWS ? "[|\n\r]" : "[ \t\n\r\f]"; + public static final String TOKEN_SEPARATOR_REGEX = WINDOWS ? "[|\n\r]" : "[ \t\n\r\f]"; - private long interval; // refresh interval in msec - private long lastTime; // last time the command was performed + private long interval; // refresh interval in msec + private long lastTime; // last time the command was performed final private boolean redirectErrorStream; // merge stdout and stderr private Map<String, String> environment; // env for the command execution private File dir; private Process process; // sub process used to execute the command private int exitCode; - /**Time after which the executing script would be timedout*/ + /** Time after which the executing script would be timedout */ protected long timeOutInterval = 0L; - /** If or not script timed out*/ + /** If or not script timed out */ private AtomicBoolean timedOut; - /**If or not script finished executing*/ + /** If or not script finished executing */ private volatile AtomicBoolean completed; public ShellUtils() { @@ -103,23 +94,26 @@ abstract public class ShellUtils { } /** - * @param interval the minimum duration to wait before re-executing the - * command. + * @param interval the minimum duration to wait before re-executing the command. */ public ShellUtils(long interval, boolean redirectErrorStream) { this.interval = interval; - this.lastTime = (interval<0) ? 0 : -interval; + this.lastTime = (interval < 0) ? 0 : -interval; this.redirectErrorStream = redirectErrorStream; } - /** set the environment for the command + /** + * set the environment for the command + * * @param env Mapping of environment variables */ protected void setEnvironment(Map<String, String> env) { this.environment = env; } - /** set the working directory + /** + * set the working directory + * * @param dir The directory where the command would be executed */ protected void setWorkingDirectory(File dir) { @@ -128,23 +122,18 @@ abstract public class ShellUtils { /** a Unix command to get the current user's groups list */ public static String[] getGroupsCommand() { - return (WINDOWS)? new String[]{"cmd", "/c", "groups"} - : new String[]{"bash", "-c", "groups"}; + return (WINDOWS) ? new String[] { "cmd", "/c", "groups" } : new String[] { "bash", "-c", "groups" }; } /** - * a Unix command to get a given user's groups list. - * If the OS is not WINDOWS, the command will get the user's primary group - * first and finally get the groups list which includes the primary group. - * i.e. the user's primary group will be included twice. + * a Unix command to get a given user's groups list. If the OS is not WINDOWS, the command will get the user's primary group first and finally get the + * groups list which includes the primary group. i.e. the user's primary group will be included twice. */ public static String[] getGroupsForUserCommand(final String user) { - //'groups username' command return is non-consistent across different unixes - return new String [] {"bash", "-c", "id -gn " + user - + "&& id -Gn " + user}; + // 'groups username' command return is non-consistent across different unixes + return new String[] { "bash", "-c", "id -gn " + user + "&& id -Gn " + user }; } - /** check to see if a command needs to be executed and execute if needed */ protected void run() throws IOException { if (lastTime + interval > System.currentTimeMillis()) @@ -174,51 +163,48 @@ abstract public class ShellUtils { if (timeOutInterval > 0) { timeOutTimer = new Timer("Shell command timeout"); timeoutTimerTask = new ShellTimeoutTimerTask(this); - //One time scheduling. + // One time scheduling. timeOutTimer.schedule(timeoutTimerTask, timeOutInterval); } - final BufferedReader errReader = - new BufferedReader(new InputStreamReader(process - .getErrorStream())); - BufferedReader inReader = - new BufferedReader(new InputStreamReader(process - .getInputStream())); + final BufferedReader errReader = new BufferedReader(new InputStreamReader(process.getErrorStream())); + BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); final StringBuffer errMsg = new StringBuffer(); // read error and input streams as this would free up the buffers // free the error stream buffer Thread errThread = new Thread() { - @Override - public void run() { - try { - String line = errReader.readLine(); - while((line != null) && !isInterrupted()) { - errMsg.append(line); - errMsg.append(System.getProperty("line.separator")); - line = errReader.readLine(); - } - } catch(IOException ioe) { - LOG.warn("Error reading the error stream", ioe); + @Override + public void run() { + try { + String line = errReader.readLine(); + while ((line != null) && !isInterrupted()) { + errMsg.append(line); + errMsg.append(System.getProperty("line.separator")); + line = errReader.readLine(); } + } catch (IOException ioe) { + LOG.warn("Error reading the error stream", ioe); } - }; + } + }; try { errThread.start(); - } catch (IllegalStateException ise) { } + } catch (IllegalStateException ise) { + } try { parseExecResult(inReader); // parse the output // clear the input stream buffer String line = inReader.readLine(); - while(line != null) { + while (line != null) { line = inReader.readLine(); } // wait for the process to finish and check the exit code - exitCode = process.waitFor(); + exitCode = process.waitFor(); // make sure that the error thread exits joinThread(errThread); completed.set(true); - //the timeout thread handling - //taken care in finally block + // the timeout thread handling + // taken care in finally block if (exitCode != 0) { throw new ExitCodeException(exitCode, errMsg.toString()); } @@ -233,10 +219,10 @@ abstract public class ShellUtils { // JDK 7 tries to automatically drain the input streams for us // when the process exits, but since close is not synchronized, // it creates a race if we close the stream first and the same - // fd is recycled. the stream draining thread will attempt to - // drain that fd!! it may block, OOM, or cause bizarre behavior + // fd is recycled. the stream draining thread will attempt to + // drain that fd!! it may block, OOM, or cause bizarre behavior // see: https://bugs.openjdk.java.net/browse/JDK-8024521 - // issue is fixed in build 7u60 + // issue is fixed in build 7u60 InputStream stdout = process.getInputStream(); synchronized (stdout) { inReader.close(); @@ -278,10 +264,11 @@ abstract public class ShellUtils { protected abstract String[] getExecString(); /** Parse the execution result */ - protected abstract void parseExecResult(BufferedReader lines) - throws IOException; + protected abstract void parseExecResult(BufferedReader lines) throws IOException; - /** get the current sub-process executing the given command + /** + * get the current sub-process executing the given command + * * @return process executing the command */ public Process getProcess() { @@ -306,18 +293,15 @@ abstract public class ShellUtils { /** * A simple shell command executor. - * - * <code>ShellCommandExecutor</code>should be used in cases where the output - * of the command needs no explicit parsing and where the command, working - * directory and the environment remains unchanged. The output of the command - * is stored as-is and is expected to be small. + * + * <code>ShellCommandExecutor</code>should be used in cases where the output of the command needs no explicit parsing and where the command, working + * directory and the environment remains unchanged. The output of the command is stored as-is and is expected to be small. */ public static class ShellCommandExecutor extends ShellUtils { private String[] command; private StringBuffer output; - public ShellCommandExecutor(String[] execString) { this(execString, null); } @@ -326,27 +310,22 @@ abstract public class ShellUtils { this(execString, dir, null); } - public ShellCommandExecutor(String[] execString, File dir, - Map<String, String> env) { - this(execString, dir, env , 0L); + public ShellCommandExecutor(String[] execString, File dir, Map<String, String> env) { + this(execString, dir, env, 0L); } /** * Create a new instance of the ShellCommandExecutor to execute a command. - * + * * @param execString The command to execute with arguments - * @param dir If not-null, specifies the directory which should be set - * as the current working directory for the command. - * If null, the current working directory is not modified. - * @param env If not-null, environment of the command will include the - * key-value pairs specified in the map. If null, the current - * environment is not modified. - * @param timeout Specifies the time in milliseconds, after which the - * command will be killed and the status marked as timedout. - * If 0, the command will not be timed out. + * @param dir If not-null, specifies the directory which should be set as the current working directory for the command. If null, the current working + * directory is not modified. + * @param env If not-null, environment of the command will include the key-value pairs specified in the map. If null, the current environment is not + * modified. + * @param timeout Specifies the time in milliseconds, after which the command will be killed and the status marked as timedout. If 0, the command will + * not be timed out. */ - public ShellCommandExecutor(String[] execString, File dir, - Map<String, String> env, long timeout) { + public ShellCommandExecutor(String[] execString, File dir, Map<String, String> env, long timeout) { command = execString.clone(); if (dir != null) { setWorkingDirectory(dir); @@ -357,7 +336,6 @@ abstract public class ShellUtils { timeOutInterval = timeout; } - /** Execute the shell command. */ public void execute() throws IOException { this.run(); @@ -373,21 +351,19 @@ abstract public class ShellUtils { output = new StringBuffer(); char[] buf = new char[512]; int nRead; - while ( (nRead = lines.read(buf, 0, buf.length)) > 0 ) { + while ((nRead = lines.read(buf, 0, buf.length)) > 0) { output.append(buf, 0, nRead); } } - /** Get the output of the shell command.*/ + /** Get the output of the shell command. */ public String getOutput() { return (output == null) ? "" : output.toString(); } /** - * Returns the commands of this instance. - * Arguments with spaces in are presented with quotes round; other - * arguments are presented raw - * + * Returns the commands of this instance. Arguments with spaces in are presented with quotes round; other arguments are presented raw + * * @return a string representation of the object. */ @Override @@ -407,9 +383,8 @@ abstract public class ShellUtils { } /** - * To check if the passed script to shell command executor timed out or - * not. - * + * To check if the passed script to shell command executor timed out or not. + * * @return if the script timed out. */ public boolean isTimedOut() { @@ -418,52 +393,45 @@ abstract public class ShellUtils { /** * Set if the command has timed out. - * + * */ private void setTimedOut() { this.timedOut.set(true); } - /** - * Static method to execute a shell command. - * Covers most of the simple cases without requiring the user to implement - * the <code>Shell</code> interface. + * Static method to execute a shell command. Covers most of the simple cases without requiring the user to implement the <code>Shell</code> interface. + * * @param cmd shell command to execute. * @return the output of the executed command. */ - public static String execCommand(String ... cmd) throws IOException { + public static String execCommand(String... cmd) throws IOException { return execCommand(null, cmd, 0L); } /** - * Static method to execute a shell command. - * Covers most of the simple cases without requiring the user to implement - * the <code>Shell</code> interface. + * Static method to execute a shell command. Covers most of the simple cases without requiring the user to implement the <code>Shell</code> interface. + * * @param env the map of environment key=value * @param cmd shell command to execute. * @param timeout time in milliseconds after which script should be marked timeout * @return the output of the executed command.o */ - public static String execCommand(Map<String, String> env, String[] cmd, - long timeout) throws IOException { - ShellCommandExecutor exec = new ShellCommandExecutor(cmd, null, env, - timeout); + public static String execCommand(Map<String, String> env, String[] cmd, long timeout) throws IOException { + ShellCommandExecutor exec = new ShellCommandExecutor(cmd, null, env, timeout); exec.execute(); return exec.getOutput(); } /** - * Static method to execute a shell command. - * Covers most of the simple cases without requiring the user to implement - * the <code>Shell</code> interface. + * Static method to execute a shell command. Covers most of the simple cases without requiring the user to implement the <code>Shell</code> interface. + * * @param env the map of environment key=value * @param cmd shell command to execute. * @return the output of the executed command. */ - public static String execCommand(Map<String,String> env, String ... cmd) - throws IOException { + public static String execCommand(Map<String, String> env, String... cmd) throws IOException { return execCommand(env, cmd, 0L); } @@ -484,9 +452,9 @@ abstract public class ShellUtils { try { p.exitValue(); } catch (Exception e) { - //Process has not terminated. - //So check if it has completed - //if not just destroy it. + // Process has not terminated. + // So check if it has completed + // if not just destroy it. if (p != null && !shell.completed.get()) { shell.setTimedOut(); p.destroy(); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java b/jstorm-core/src/main/java/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java index 4aa5556..dd57832 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java @@ -31,12 +31,9 @@ public class StormBoundedExponentialBackoffRetry extends BoundedExponentialBacko private final int linearBaseSleepMs; /** - * The class provides generic exponential-linear backoff retry strategy for - * storm. It calculates threshold for exponentially increasing sleeptime - * for retries. Beyond this threshold, the sleeptime increase is linear. - * Also adds jitter for exponential/linear retry. - * It guarantees currSleepTimeMs >= prevSleepTimeMs and - * baseSleepTimeMs <= currSleepTimeMs <= maxSleepTimeMs + * The class provides generic exponential-linear backoff retry strategy for storm. It calculates threshold for exponentially increasing sleeptime for + * retries. Beyond this threshold, the sleeptime increase is linear. Also adds jitter for exponential/linear retry. It guarantees currSleepTimeMs >= + * prevSleepTimeMs and baseSleepTimeMs <= currSleepTimeMs <= maxSleepTimeMs */ public StormBoundedExponentialBackoffRetry(int baseSleepTimeMs, int maxSleepTimeMs, int maxRetries) { @@ -44,17 +41,15 @@ public class StormBoundedExponentialBackoffRetry extends BoundedExponentialBacko expRetriesThreshold = 1; while ((1 << (expRetriesThreshold + 1)) < ((maxSleepTimeMs - baseSleepTimeMs) / 2)) expRetriesThreshold++; - LOG.info("The baseSleepTimeMs [" + baseSleepTimeMs + "] the maxSleepTimeMs [" + maxSleepTimeMs + "] " + - "the maxRetries [" + maxRetries + "]"); + LOG.info("The baseSleepTimeMs [" + baseSleepTimeMs + "] the maxSleepTimeMs [" + maxSleepTimeMs + "] " + "the maxRetries [" + maxRetries + "]"); if (baseSleepTimeMs > maxSleepTimeMs) { - LOG.warn("Misconfiguration: the baseSleepTimeMs [" + baseSleepTimeMs + "] can't be greater than " + - "the maxSleepTimeMs [" + maxSleepTimeMs + "]."); + LOG.warn("Misconfiguration: the baseSleepTimeMs [" + baseSleepTimeMs + "] can't be greater than " + "the maxSleepTimeMs [" + maxSleepTimeMs + "]."); } - if( maxRetries > 0 && maxRetries > expRetriesThreshold ) { + if (maxRetries > 0 && maxRetries > expRetriesThreshold) { this.stepSize = Math.max(1, (maxSleepTimeMs - (1 << expRetriesThreshold)) / (maxRetries - expRetriesThreshold)); } else { this.stepSize = 1; - } + } this.linearBaseSleepMs = super.getBaseSleepTimeMs() + (1 << expRetriesThreshold); } @@ -67,8 +62,7 @@ public class StormBoundedExponentialBackoffRetry extends BoundedExponentialBacko return sleepTimeMs; } else { int stepJitter = random.nextInt(stepSize); - return Math.min(super.getMaxSleepTimeMs(), (linearBaseSleepMs + - (stepSize * (retryCount - expRetriesThreshold)) + stepJitter)); + return Math.min(super.getMaxSleepTimeMs(), (linearBaseSleepMs + (stepSize * (retryCount - expRetriesThreshold)) + stepJitter)); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/TestUtils.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/TestUtils.java b/jstorm-core/src/main/java/backtype/storm/utils/TestUtils.java index 276559c..f905ae4 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/TestUtils.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/TestUtils.java @@ -25,9 +25,7 @@ import java.util.Map; public class TestUtils extends Utils { - public static void testSetupBuilder(CuratorFrameworkFactory.Builder - builder, String zkStr, Map conf, ZookeeperAuthInfo auth) - { + public static void testSetupBuilder(CuratorFrameworkFactory.Builder builder, String zkStr, Map conf, ZookeeperAuthInfo auth) { setupBuilder(builder, zkStr, conf, auth); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ThreadResourceManager.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ThreadResourceManager.java b/jstorm-core/src/main/java/backtype/storm/utils/ThreadResourceManager.java index e3ab03f..c43ff06 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/ThreadResourceManager.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/ThreadResourceManager.java @@ -23,14 +23,14 @@ public class ThreadResourceManager<T> { public static interface ResourceFactory<X> { X makeResource(); } - + ResourceFactory<T> _factory; ConcurrentLinkedQueue<T> _resources = new ConcurrentLinkedQueue<T>(); - + public ThreadResourceManager(ResourceFactory<T> factory) { _factory = factory; } - + public T acquire() { T ret = _resources.poll(); if (ret == null) { @@ -38,7 +38,7 @@ public class ThreadResourceManager<T> { } return ret; } - + public void release(T resource) { _resources.add(resource); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ThriftTopologyUtils.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ThriftTopologyUtils.java b/jstorm-core/src/main/java/backtype/storm/utils/ThriftTopologyUtils.java index 47a48c7..c872721 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/ThriftTopologyUtils.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/ThriftTopologyUtils.java @@ -17,17 +17,13 @@ */ package backtype.storm.utils; +import backtype.storm.generated.*; + import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import backtype.storm.generated.Bolt; -import backtype.storm.generated.ComponentCommon; -import backtype.storm.generated.SpoutSpec; -import backtype.storm.generated.StateSpoutSpec; -import backtype.storm.generated.StormTopology; - public class ThriftTopologyUtils { public static Set<String> getComponentIds(StormTopology topology) { Set<String> ret = new HashSet<String>(); @@ -37,7 +33,7 @@ public class ThriftTopologyUtils { } return ret; } - + public static Map<String, Object> getComponents(StormTopology topology) { Map<String, Object> ret = new HashMap<String, Object>(); for (StormTopology._Fields f : StormTopology.metaDataMap.keySet()) { @@ -46,7 +42,7 @@ public class ThriftTopologyUtils { } return ret; } - + public static ComponentCommon getComponentCommon(StormTopology topology, String componentId) { for (StormTopology._Fields f : StormTopology.metaDataMap.keySet()) { Map<String, Object> componentMap = (Map<String, Object>) topology.getFieldValue(f);
